In [7]:
import torch
import numpy as np
import pandas as pd
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import math

# Define Positional Encoding to handle sequential information
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.encoding = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        self.encoding[:, 0::2] = torch.sin(position * div_term)
        self.encoding[:, 1::2] = torch.cos(position * div_term)
        self.encoding = self.encoding.unsqueeze(0)

    def forward(self, x):
        return x + self.encoding[:, :x.size(1), :].to(x.device)

# Define the Transformer Encoder model
class TransformerModel(nn.Module):
    def __init__(self, input_size=3, d_model=64, nhead=4, num_layers=3, output_size=3 * 20):
        super(TransformerModel, self).__init__()
        self.input_fc = nn.Linear(input_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model)
        encoder_layers = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dim_feedforward=256)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers=num_layers)
        self.fc_out = nn.Linear(d_model, output_size)

    def forward(self, x):
        x = self.input_fc(x)
        x = self.pos_encoder(x)
        x = self.transformer_encoder(x)
        x = self.fc_out(x[:, -1, :])  # Use the last time step for prediction
        return x

# Initialize model, criterion, and optimizer
input_length = 80
output_length = 20
model = TransformerModel()
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Load or generate non-fault data from CSV file
def load_non_fault_data(file_path, input_length=80, output_length=20):
    data = pd.read_csv(file_path)
    sequences = []
    for i in range(len(data) - (input_length + output_length)):
        sequence = data.iloc[i:i + input_length + output_length][['x', 'y', 'z']].values
        sequences.append(sequence)

    # Split into input and target sequences
    inputs = [seq[:input_length] for seq in sequences]
    targets = [seq[input_length:] for seq in sequences]
    inputs = torch.tensor(inputs, dtype=torch.float32)
    targets = torch.tensor(targets, dtype=torch.float32).reshape(len(sequences), -1)
    return TensorDataset(inputs, targets)

# Training loop
def train_model(model, criterion, optimizer, dataset, num_epochs=10, batch_size=32):
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    for epoch in range(num_epochs):
        model.train()
        epoch_loss = 0.0
        for inputs, targets in dataloader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()

        print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {epoch_loss / len(dataloader):.4f}")

# Load non-fault dataset from CSV and train the model
non_fault_data_file = "/Users/prasanna/Desktop/Hack2Future/_non_fault_data.csv"  # Replace with actual path to non-fault data CSV
non_fault_dataset = load_non_fault_data(non_fault_data_file, input_length, output_length)
train_model(model, criterion, optimizer, non_fault_dataset)

# Save the model state dictionary locally
model_path = "/Users/prasanna/Desktop/Hack2Future/transformer_model.pth"
torch.save(model.state_dict(), model_path)
print(f"Model saved to {model_path}")




Epoch [1/10], Loss: 0.2805
Epoch [2/10], Loss: 0.0263
Epoch [3/10], Loss: 0.0179
Epoch [4/10], Loss: 0.0166
Epoch [5/10], Loss: 0.0150
Epoch [6/10], Loss: 0.0144
Epoch [7/10], Loss: 0.0144
Epoch [8/10], Loss: 0.0130
Epoch [9/10], Loss: 0.0116
Epoch [10/10], Loss: 0.0108
Model saved to /Users/prasanna/Desktop/Hack2Future/transformer_model.pth
