In [106]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
import os
import pandas as pd
import numpy as np
from torchvision.io import read_image
from torchvision import transforms
from torch.utils.data import DataLoader
import torch.optim as optim
import torch
from torchvision.transforms import ToTensor
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

In [6]:
#Customize a dataset for a fusion model
class FusionDataset():
    def __init__(self, csv_file_X, csv_file_y, hr_file, transform=transform, target_transform=None):
        self.openface_X = np.load(csv_file_X)
        self.openface_label = np.load(csv_file_y)
        self.hr_file = pd.read_csv(hr_file)
        self.lstm_length = lstm_length
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.hr_file)

    def __getitem__(self, idx):
        image = self.openface_X[idx,:len(self.openface_X)-1]
        image = torch.tensor(image, dtype=torch.float)
        labels = self.openface_label
        labels = torch.tensor(labels, dtype=torch.float)

        hr = self.hr_file.iloc[idx, 1:self.lstm_length+1]
        hr = torch.Tensor(hr)
        hr = torch.unsqueeze(hr, 0)
        hr_label = self.hr_file.iloc[idx, 1]
        hr_label = torch.tensor(hr_label, dtype=torch.float)
        hr_label = torch.unsqueeze(hr_label, 0)
        
        return image, labels, hr, hr_label

In [107]:
#Customize a dataset for an openface model
class OFDataset():
    def __init__(self, csv_file_X, csv_file_y):
        self.openface_X = np.load(csv_file_X)
        self.openface_label = np.load(csv_file_y)

    def __len__(self):
        return len(self.openface_X)

    def __getitem__(self, idx):
        image = self.openface_X[idx,:len(self.openface_X)-1]
        image = torch.tensor(image, dtype=torch.float)
        labels = self.openface_label
        labels = torch.tensor(labels, dtype=torch.float)
        
        return image, labels

In [None]:
#Customize a dataset for a cnn model
class ImageDataset():
    def __init__(self, csv_file_X, csv_file_y):
        self.openface_X = np.load(csv_file_X)
        self.openface_label = np.load(csv_file_y)

    def __len__(self):
        return len(self.openface_X)

    def __getitem__(self, idx):
        image = self.openface_X[idx,:len(self.openface_X)-1]
        image = torch.tensor(image, dtype=torch.float)
        labels = self.openface_label
        labels = torch.tensor(labels, dtype=torch.float)
        
        return image, labels

In [8]:
#Customize a dataset for an heart rate model
class HRDataset():
    """Face Landmarks dataset."""

    def __init__(self, csv_file, lstm_length):
        """
        Args:
            csv_file (string): Path to the csv file with annotations.
            root_dir (string): Directory with all the images.
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
        self.hr_file = pd.read_csv(csv_file)
        self.lstm_length = lstm_length

    def __len__(self):
        return len(self.hr_file)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        
        hr = self.hr_file.iloc[idx, 1:self.lstm_length+1]
        hr = torch.Tensor(hr)
        hr = torch.unsqueeze(hr, 0)
        label = self.hr_file.iloc[idx, 0]
        label = torch.tensor(label, dtype=torch.float)

        return hr, label

In [104]:
#Split heart rate data into training set and test set and generate two dataloaders respectively
lstm_length = 10
data_HR = HRDataset('/content/drive/MyDrive/HRItest/HRI HR_update.csv', lstm_length)
HR_training_data, HR_test_data = train_test_split(data_HR, test_size=0.33, random_state=42)
batch_size = 20
HR_train_dataloader = DataLoader(HR_training_data, batch_size=batch_size, shuffle=True)
HR_test_dataloader = DataLoader(HR_test_data, batch_size=batch_size, shuffle=True)

In [108]:
#Split openface data into training set and test set and generate two dataloaders respectively
data_of = OFDataset('/content/drive/MyDrive/HRItest/X.npy', '/content/drive/MyDrive/HRItest/y.npy')
training_data, test_data = train_test_split(data_of, test_size=0.33, random_state=42)
batch_size = 20
of_train_dataloader = DataLoader(training_data, batch_size=batch_size, shuffle=True)
of_test_dataloader = DataLoader(test_data, batch_size=batch_size, shuffle=True)

In [17]:
#Split data into training set and test set for a fusion model and generate two dataloaders respectively
lstm_length = 10
data = FusionDataset('/content/drive/MyDrive/HRItest/X.npy', '/content/drive/MyDrive/HRItest/y.npy','/content/drive/MyDrive/HRItest/HRI HR_update.csv', lstm_length)
training_data, test_data = train_test_split(data, test_size=0.33, random_state=42)
batch_size = 20
train_dataloader = DataLoader(training_data, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=batch_size, shuffle=True)

In [67]:
#Build a neural network for openface data
class OFNet(torch.nn.Module):
  def __init__(self,size):
      super(OFNet, self).__init__()
      self.size = size
      self.new_layers = torch.nn.Sequential(torch.nn.Linear(self.size, 1))
                                      
  def forward(self, x):
      x = self.new_layers(x)
      return x

In [54]:
#Build a LSTM network for a fusion model
class Lstm(torch.nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super().__init__()
        self.hidden_size = hidden_size
        self.lstm = torch.nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        #self.fc   = torch.nn.Linear(hidden_size, output_size)
        self.fc = torch.nn.Sequential(torch.nn.Linear(hidden_size, 100),
                                           torch.nn.ReLU(),
                                           torch.nn.Linear(100, output_size))
        
    def forward(self, x):
   
        out,_ = self.lstm(x)           # out.shape = (batch_size, seq_len, hidden_size)
        out = out.view(-1, self.hidden_size) # out.shape = (seq_len, hidden_size)     
        out = self.fc(out)
        
        return out

In [55]:
#Build a LSTM network for a non-fusion model
class Lstm_only(torch.nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super().__init__()
        self.hidden_size = hidden_size
        self.lstm = torch.nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc   = torch.nn.Linear(hidden_size, output_size)
        #self.fc = torch.nn.Sequential(torch.nn.Linear(hidden_size, 100),
                                           #torch.nn.ReLU(),
                                           #torch.nn.Linear(100, output_size))
        
    def forward(self, x):
   
        out,_ = self.lstm(x)           # out.shape = (batch_size, seq_len, hidden_size)
        out = out.view(-1, self.hidden_size) # out.shape = (seq_len, hidden_size)     
        out = self.fc(out)
        
        return out

In [83]:
#Build a MLP for a fusion model
class Net(torch.nn.Module):
  def __init__(self,size):
      super(Net, self).__init__()
      self.size = size
      self.new_layers = torch.nn.Sequential(torch.nn.Linear(self.size, 2))
                                          
  def forward(self, x):
      x = self.new_layers(x)
      return x

In [140]:
#Train a LSTM model
def lstm_train(model, epochs, train_dataloader, valid_data=None, lr=0.001, print_every=100):

    criterion = torch.nn.MSELoss()
    opt = optim.Adam(model.parameters(), lr=lr)
    
    train_loss = []

    for epoch in range(epochs):
        
        #hs = None
        t_loss = 0
        for i, data in enumerate(train_dataloader, 0):

            lstm_inputs, lstm_labels = data

            opt.zero_grad()
            
            # Create batch_size dimension
            #x = x.unsqueeze(0)
            #print(lstm_inputs.shape,lstm_labels.shape)
            #print(lstm_inputs.shape)
            out = model(lstm_inputs)
            
            
            loss = criterion(out, lstm_labels)
            #print(out.shape,lstm_labels.shape)
            loss.backward()
            opt.step()
            t_loss += loss.item()
            
            
        train_loss.append(np.mean(t_loss))
            
            
        if epoch % print_every == 0:
            print(f'Epoch {epoch}:\nTraining Loss: {train_loss[-1]}')
            

In [139]:
#Train a LSTM for fusion model
def lstm_train_fusion(model, epochs, train_dataloader, valid_data=None, lr=0.001, print_every=100):

    criterion = torch.nn.MSELoss()
    opt = optim.Adam(model.parameters(), lr=lr)
    
    train_loss = []

    for epoch in range(epochs):
        
        #hs = None
        t_loss = 0
        for i, data in enumerate(train_dataloader, 0):

            _,_,lstm_inputs, lstm_labels = data

            opt.zero_grad()
            
            # Create batch_size dimension
            #x = x.unsqueeze(0)
            #print(lstm_inputs.shape,lstm_labels.shape)
            #print(lstm_inputs.shape)
            out = model(lstm_inputs)
            
            
            loss = criterion(out, lstm_labels)
            #print(out.shape,lstm_labels.shape)
            loss.backward()
            opt.step()
            t_loss += loss.item()
            
            
        train_loss.append(np.mean(t_loss))
            
            
        if epoch % print_every == 0:
            print(f'Epoch {epoch}:\nTraining Loss: {train_loss[-1]}')
            

In [138]:
##Train a openface model
def OF_train(model, epochs, train_dataloader, valid_data=None, lr=0.001, print_every=100):

    criterion = torch.nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    train_loss = []

    for epoch in range(epochs):
        
        hs = None
        t_loss = 0
        for i, data in enumerate(train_dataloader, 0):
            
            features, labels = data
            #print(features.shape,labels.shape)

            optimizer.zero_grad()
            
            # Create batch_size dimension
            #x = x.unsqueeze(0)
            
            output = model(features)
            #print(output.shape)
            #print(output.shape,labels.shape)
            #hs = tuple([h.data for h in hs])
            
            loss = criterion(output, labels)
            loss.backward()
            optimizer.step()
            t_loss += loss.item()
            
            
        train_loss.append(np.mean(t_loss))
            
            
        if epoch % print_every == 0:
            print(f'Epoch {epoch}:\nTraining Loss: {train_loss[-1]}')
            

In [137]:
#Train a openface model for fusion model
def OF_train_fusion(model, epochs, train_dataloader, valid_data=None, lr=0.001, print_every=100):

    criterion = torch.nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    train_loss = []

    for epoch in range(epochs):
        
        hs = None
        t_loss = 0
        for i, data in enumerate(train_dataloader, 0):
            
            features, labels,_,_ = data
            #print(features.shape,labels.shape)

            optimizer.zero_grad()
            
            # Create batch_size dimension
            #x = x.unsqueeze(0)
            
            output = model(features)
            #print(output.shape)
            #print(output.shape,labels.shape)
            #hs = tuple([h.data for h in hs])
            
            loss = criterion(output, labels)
            loss.backward()
            optimizer.step()
            t_loss += loss.item()
            
            
        train_loss.append(np.mean(t_loss))
            
            
        if epoch % print_every == 0:
            print(f'Epoch {epoch}:\nTraining Loss: {train_loss[-1]}')
            

In [136]:
#Train fusion model
def net_train(model, epochs, train_dataloader, valid_data=None, lr=0.001, print_every=100):

    criterion = torch.nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    train_loss = []

    for epoch in range(epochs):
        
        hs = None
        t_loss = 0
        for i, data in enumerate(train_dataloader, 0):
            
            features, labels = data[:-1],data[-1]

            optimizer.zero_grad()
            
            # Create batch_size dimension
            #x = x.unsqueeze(0)
            
            output = net(features)
            #print(output.shape,labels.shape)
            #hs = tuple([h.data for h in hs])
            
            loss = criterion(output, labels)
            loss.backward()
            optimizer.step()
            t_loss += loss.item()
            
            
        train_loss.append(np.mean(t_loss))
        
        #t_loss += loss.item()    
            
        if epoch % print_every == 0:
            print(f'Epoch {epoch}:\nTraining Loss: {train_loss[-1]}')
            

In [141]:
lstm = Lstm(input_size=lstm_length, hidden_size=1000, num_layers=1, output_size=1)
lstm_only = Lstm(input_size=lstm_length, hidden_size=1000, num_layers=1, output_size=1)
imagenet = OFNet(size=710)
net = Net(size=1)

In [142]:
OF_train(imagenet, 1000, of_train_dataloader, valid_data=None, lr=0.001, print_every=100)

  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)


Epoch 0:
Training Loss: 121685.419090271
Epoch 100:
Training Loss: 768.8558168411255
Epoch 200:
Training Loss: 378.87736415863037
Epoch 300:
Training Loss: 44009.89278411865
Epoch 400:
Training Loss: 561.7428703308105
Epoch 500:
Training Loss: 306.3523836135864
Epoch 600:
Training Loss: 341.476957321167
Epoch 700:
Training Loss: 305.29329109191895
Epoch 800:
Training Loss: 631.6449089050293
Epoch 900:
Training Loss: 375.36213397979736


In [143]:
lstm_train(lstm, 1000, HR_train_dataloader, lr=0.0005)

  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)


Epoch 0:
Training Loss: 83.33154296875
Epoch 100:
Training Loss: 17.216050148010254
Epoch 200:
Training Loss: 16.96065902709961
Epoch 300:
Training Loss: 17.15447187423706
Epoch 400:
Training Loss: 17.407597541809082
Epoch 500:
Training Loss: 16.903281211853027
Epoch 600:
Training Loss: 16.97908115386963
Epoch 700:
Training Loss: 17.1210355758667
Epoch 800:
Training Loss: 17.031180381774902
Epoch 900:
Training Loss: 17.240516662597656


In [135]:
lstm_train_fusion(lstm_only, 1000, train_dataloader, lr=0.0005)

Epoch 0:
Training Loss: 23130.96142578125
Epoch 100:
Training Loss: 2008.9101993589118
Epoch 200:
Training Loss: 1033.2977653987966
Epoch 300:
Training Loss: 696.5815197321466
Epoch 400:
Training Loss: 525.9162960794204
Epoch 500:
Training Loss: 423.26029957970695
Epoch 600:
Training Loss: 354.6298524425698
Epoch 700:
Training Loss: 305.488964826315
Epoch 800:
Training Loss: 268.37545576693486
Epoch 900:
Training Loss: 239.52854514577643


In [111]:
#Save train models
PATH = './HRI_net.pth'
torch.save(lstm_only.state_dict(), PATH)
lstm_only.load_state_dict(torch.load(PATH))
PATH = './HRI_net.pth'
torch.save(imagenet.state_dict(), PATH)
imagenet.load_state_dict(torch.load(PATH))

<All keys matched successfully>

In [113]:
#Compute the test loss for lstm model
criterion = torch.nn.MSELoss()
t_loss = 0
with torch.no_grad():
    for data in HR_test_dataloader:
        features, labels = data
        net_outputs = lstm(features)
        loss = torch.sqrt(criterion(net_outputs, labels))
        #t_loss += loss.item()

    print('test_loss:',loss)

test_loss: tensor(6.5750)


  return F.mse_loss(input, target, reduction=self.reduction)


In [112]:
#Compute the test loss for openface model
criterion = torch.nn.MSELoss()
t_loss = 0
with torch.no_grad():
    for data in of_test_dataloader:
        features, labels = data
        net_outputs = imagenet(features)
        loss = torch.sqrt(criterion(net_outputs, labels))
        t_loss += loss.item()

    print('test_loss:',np.mean(t_loss))

test_loss: 623.2139301300049


  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)


In [117]:
#Generate a new dataset for fusion model
hs = None
dataset = None

with torch.no_grad():
    for data in test_dataloader:
        inputs, labels, lstm_inputs, lstm_labels = data
        
        image_outputs = imagenet(inputs)
        lstm_outputs = lstm(lstm_inputs)
        
        
        #features = torch.cat((torch.cat((cnn_outputs, torch.squeeze((lstm_outputs),1)), 1),torch.unsqueeze(labels,1)),1)
        features = torch.cat((image_outputs, lstm_outputs),1)
        if dataset is None:
          dataset = features
        else:
          dataset = torch.cat((dataset, features), 0)
       
      


In [118]:
training_data_pred, test_data_pred = train_test_split(dataset, test_size=0.33, random_state=42)

In [124]:
net_train(net, 900, training_data_pred, lr=0.0005)

  return F.mse_loss(input, target, reduction=self.reduction)


Epoch 0:
Training Loss: 0.011692415064317174
Epoch 100:
Training Loss: 0.008945544684604444
Epoch 200:
Training Loss: 0.008779892735553326
Epoch 300:
Training Loss: 0.008626589869756717
Epoch 400:
Training Loss: 0.00847910259864916
Epoch 500:
Training Loss: 0.008335780462878079
Epoch 600:
Training Loss: 0.008195885988202908
Epoch 700:
Training Loss: 0.008058986601874816
Epoch 800:
Training Loss: 0.00792479847479519


In [120]:
PATH = './HRI_net.pth'
torch.save(net.state_dict(), PATH)
net.load_state_dict(torch.load(PATH))

<All keys matched successfully>

In [122]:
#Compute the test loss for fusion model
criterion = torch.nn.MSELoss()
t_loss = 0

with torch.no_grad():
    for data in test_data_pred:
        features, labels = data[:-1],data[-1]
        net_outputs = net(features)
        loss = torch.sqrt(criterion(net_outputs, labels))
        t_loss += loss.item()

    print('test_loss:',np.mean(t_loss))


test_loss: 0.18427092395722866


  return F.mse_loss(input, target, reduction=self.reduction)
