In [40]:
import torch
from torch import nn
from torch.utils.data import DataLoader
import csv
import torch

# Define a dataset class for the IMU data sequence
class IMUDataset(torch.utils.data.Dataset):
  def __init__(self, data):
    self.data = data
  
  def __len__(self):
    return len(self.data)
  
  def __getitem__(self, index):
    return self.data[index]

# Define the contrastive loss function
# def contrastive_loss(output1, output2, label):
#   distance = torch.norm(output1 - output2)
#   return torch.mean(label * torch.pow(distance, 2) +
#                     (1 - label) * torch.pow(torch.clamp(2 - distance, min=0.0), 2))

def contrastive_loss(pred, target):
    margin = 1
    square_pred = torch.pow(pred, 2)
    margin_square = torch.pow(torch.clamp(margin - pred, min=0), 2)
    return torch.mean(target * square_pred + (1 - target) * margin_square)


# Define the neural network architecture
class IMUNet(nn.Module):
  def __init__(self):
    super().__init__()
    self.fc1 = nn.Linear(10, 32)
    self.fc2 = nn.Linear(32, 64)
    self.fc3 = nn.Linear(64, 128)
    self.fc4 = nn.Linear(128, 256)
    self.fc5 = nn.Linear(256, 512)
  
  def forward(self, x1, x2):
    x1 = nn.functional.relu(self.fc1(x1))
    x1 = nn.functional.relu(self.fc2(x1))
    x1 = nn.functional.relu(self.fc3(x1))
    x1 = nn.functional.relu(self.fc4(x1))
    
    x2 = nn.functional.relu(self.fc1(x2))
    x2 = nn.functional.relu(self.fc2(x2))
    x2 = nn.functional.relu(self.fc3(x2))
    x2 = nn.functional.relu(self.fc4(x2))
    
    x = torch.abs(x1 - x2)
    x = self.fc5(x)
    
    return x

       

# read the CSV file and extract the IMU data
imu_data = []
with open('C:/Users/Sandalu Karunasena/Desktop/sampleIMU.csv') as csv_file:
  reader = csv.DictReader(csv_file)
  for row in reader:
    imu_data.append({
      'timestamp': float(row['timestamp']),
      'acceleration_x': float(row['acceleration_x']),
      'acceleration_y': float(row['acceleration_y']),
      'acceleration_z': float(row['acceleration_z']),
      'angular_velocity_x': float(row['angular_velocity_x']),
      'angular_velocity_y': float(row['angular_velocity_y']),
      'angular_velocity_z': float(row['angular_velocity_z']),
      'magnetic_field_x': float(row['magnetic_field_x']),
      'magnetic_field_y': float(row['magnetic_field_y']),
      'magnetic_field_z': float(row['magnetic_field_z']),
    })

# convert the IMU data to PyTorch tensors
imu_data = [  torch.tensor([    x['timestamp'],
    x['acceleration_x'], x['acceleration_y'], x['acceleration_z'],
    x['angular_velocity_x'], x['angular_velocity_y'], x['angular_velocity_z'],
    x['magnetic_field_x'], x['magnetic_field_y'], x['magnetic_field_z'],
  ])
  for x in imu_data
]

# Split the IMU data into positive and negative pairs
# pos_data = imu_data[imu_data > 0]
# neg_data = imu_data[imu_data < 0]

# # Divide each group of data into pairs
# pos_pairs = [(pos_data[i], pos_data[i+1]) for i in range(0, len(pos_data), 2)]
# neg_pairs = [(neg_data[i], neg_data[i+1]) for i in range(0, len(neg_data), 2)]


# define a function that determines whether two IMU data samples are similar or different
def is_similar(x1, x2):

  # determine whether x1 and x2 are similar based on the timestamps
 return x1['timestamp'] - x2['timestamp'] < 5

# split the IMU data into positive and negative pairs
positive_pairs = [(x1, x2) for i, x1 in enumerate(imu_data[:-1]) for j, x2 in enumerate(imu_data[i+1:]) if x1[0] - x2[0] < 5]
negative_pairs = [(x1, x2) for i, x1 in enumerate(imu_data[:-1]) for j, x2 in enumerate(imu_data[i+1:]) if x1[0] - x2[0] >= 5]



# create an instance of the model and the optimizer
model = IMUNet() # assuming 3-dimensional IMU data
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# train the model
num_epochs =10
for epoch in range(num_epochs):
    
  # iterate through the pairs and calculate the loss
  loss = 0
  for (x1, x2), target in zip(positive_pairs, [1] * len(positive_pairs)):
    # forward pass: compute predicted output by passing x to the model
    pred = model(x1, x2)

    # compute and print loss
    loss += contrastive_loss(pred, target)
   
    
  for (x1, x2), target in zip(negative_pairs, [0] * len(negative_pairs)):
    # forward pass: compute predicted output by passing x to the model
    pred = model(x1, x2)

    # compute and print loss
    loss += contrastive_loss(pred, target)
    
  #print("Contrastive Loss:",loss)
  # zero the gradients before running the backward pass
  optimizer.zero_grad()

  # backward pass: compute gradient of the loss with respect to model parameters
  loss.backward()

  # perform a single optimization step
  optimizer.step()

  # print the current loss
  print("Contrastive Loss:",loss)
  if (epoch + 1) % 10 == 0:
    print('epoch [{}/{}], loss: {:.4f}'.format(epoch + 1, num_epochs, loss.item()))



Contrastive Loss: tensor(16667.1055, grad_fn=<AddBackward0>)
Contrastive Loss: tensor(7480.2573, grad_fn=<AddBackward0>)
Contrastive Loss: tensor(3241.1890, grad_fn=<AddBackward0>)
Contrastive Loss: tensor(1768.7740, grad_fn=<AddBackward0>)
Contrastive Loss: tensor(1165.6493, grad_fn=<AddBackward0>)
Contrastive Loss: tensor(839.1817, grad_fn=<AddBackward0>)
Contrastive Loss: tensor(721.9299, grad_fn=<AddBackward0>)
Contrastive Loss: tensor(751.3100, grad_fn=<AddBackward0>)
Contrastive Loss: tensor(550.3378, grad_fn=<AddBackward0>)
Contrastive Loss: tensor(444.1008, grad_fn=<AddBackward0>)
epoch [10/10], loss: 444.1008
