In [25]:
from pathlib import Path
import pandas as pd

# from keras.layers import BatchNormalization, Dense, Input, Conv1D, Add, ELU, Flatten, MaxPooling1D
# from keras.layers import GlobalAveragePooling1D, Softmax, Concatenate, Reshape, Multiply, ReLU
# from keras.optimizers import SGD
# from keras import activations
# from keras import Model
# from keras.initializers import HeNormal
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import numpy as np

# from keras.utils.vis_utils import plot_model
import torch
from torch import nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
from pytorch_symbolic import Input, SymbolicModel, useful_layers

In [None]:
import os
# dataset
class DAICDataset(Dataset):
    def __init__(
        self,
        annotations_file,
        sample_dir,
        feature_type,
        transform=None,
        target_transform=None,
    ):
        self.depression_labels = pd.read_csv(annotations_file)
        self.sample_dir = sample_dir
        self.feature_type = feature_type
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.depression_labels)

    def __getitem__(self, idx):
        participant_id = self.depression_labels.iloc[idx, 0]
        participant_path = os.path.join(
            self.sample_dir,
            participant_id
            + "/"
            + participant_id
            + "_CLNF_"
            + self.feature_type
            + ".txt",
        )
        data = pd.read_csv(participant_path, sep=",")  # read_image(img_path)
        data.columns = data.columns.str.replace(" ", "")
        data.drop(columns=["frame", "timestamp", "confidence", "success"], inplace=True)
        label = self.depression_labels.iloc[idx, 1]
        if self.transform:
            data = self.transform(data)
        if self.target_transform:
            label = self.target_transform(label)
        return data.loc[1000:5999], label


label_path = Path("original_daic/labels")
pose_train = DAICDataset(
    annotations_file = label_path / "train_split_Depression_AVEC2017.csv",
    sample_dir = "original_daic/train",
    feature_type = "pose",
    transform=None,
    target_transform=None,
)
pose_dev = DAICDataset(
    annotations_file = label_path / "dev_split_Depression_AVEC2017.csv",
    sample_dir = "original_daic/dev",
    feature_type = "pose",
    transform=None,
    target_transform=None,
)
pose_test = DAICDataset(
    annotations_file = label_path / "full_test_split.csv",
    sample_dir = "original_daic/test",
    feature_type = "pose",
    transform=None,
    target_transform=None,
)

In [None]:
# dataloader
BATCH_SIZE = 1
train_dataloader = DataLoader(pose_train, batch_size=BATCH_SIZE, shuffle=True)
dev_dataloader = DataLoader(pose_dev, batch_size=BATCH_SIZE, shuffle=True)
test_dataloader = DataLoader(pose_test, batch_size=BATCH_SIZE, shuffle=True)

In [8]:
input_pose = Input(shape=[6, 5000])
tdcn_dim_pose = [input_pose[0],128,64,256,128,64] # used in Guo's paper
# tdcn_dim_pose = [input_pose[0],128,128,128,128,128]

In [11]:
def diluted_conv_block(inputs, input_dim, feature_dim):
    # with K.name_scope(block_name)
    l1_p1 = nn.Conv1d(input_dim, feature_dim, kernel_size=3, padding="same", dilation=1, bias=True)(inputs)
    l1_p2 = nn.Conv1d(input_dim, feature_dim, kernel_size=3, padding="same", dilation=1, bias=True)(inputs)
    # l1_add = Add()([l1_p1, l1_p2])
    l1_ELU = nn.ELU()(l1_p1 + l1_p2)
    # second layer of the DCB
    l2_p1 = nn.Conv1d(feature_dim, feature_dim, kernel_size=5, padding="same", dilation=2, bias=True)(l1_ELU)
    l2_p2 = nn.Conv1d(feature_dim, feature_dim, kernel_size=5, padding="same", dilation=2, bias=True)(l1_ELU)
    # l2_add = Add()([l2_p1, l2_p2])
    l2_ELU = nn.ELU()(l2_p1 + l2_p2)
    # third layer of the DCB
    l3_p1 = nn.Conv1d(feature_dim, feature_dim, kernel_size=9, padding="same", dilation=4, bias=True)(l2_ELU)
    l3_p2 = nn.Conv1d(feature_dim, feature_dim, kernel_size=9, padding="same", dilation=4, bias=True)(l2_ELU)
    # l3_add = Add()([l3_p1, l3_p2])
    l3_ELU = nn.ELU()(l3_p1 + l3_p2)

    residual = nn.Conv1d(input_dim, feature_dim, kernel_size=1, padding="same")(inputs)
    # res_add = Add()([l3_ELU, residual])
    # res_add = Add()([l1_ELU, residual])
    # res_add = ELU()(res_add)
    bn = nn.BatchNorm1d(num_features=feature_dim)(l3_ELU + residual)
    return bn

In [39]:
def time_diluted_conv_net(feature_dim, input_layer, pool_size, pool_stride):
    dcb_1 = diluted_conv_block(input_layer, feature_dim[0], feature_dim[1])
    mp_1 = nn.MaxPool1d(pool_size, stride=pool_stride)(dcb_1)
    dcb_2 = diluted_conv_block(mp_1, feature_dim[1], feature_dim[2])
    mp_2 = nn.MaxPool1d(pool_size, stride=pool_stride)(dcb_2)
    dcb_3 = diluted_conv_block(mp_2, feature_dim[2], feature_dim[3])
    mp_3 = nn.MaxPool1d(pool_size, stride=pool_stride)(dcb_3)
    dcb_4 = diluted_conv_block(mp_3, feature_dim[3], feature_dim[4])
    mp_4 = nn.MaxPool1d(pool_size, stride=pool_stride)(dcb_4)
    dcb_5 = diluted_conv_block(mp_4, feature_dim[4], feature_dim[5])
    return dcb_5

tdcn_pose = time_diluted_conv_net(
    feature_dim = tdcn_dim_pose, 
    input_layer = input_pose, 
    pool_size = 2, 
    pool_stride = 2,
    )

# concat = useful_layers.ConcatLayer([tdcn_pose])
gap_layer = nn.AdaptiveMaxPool1d(1)(tdcn_pose)
# print(gap_layer.shape)
# linear_layer_1 = nn.Linear(gap_layer.shape[1], gap_layer.shape[1])(gap_layer)
linear_layer_1 = nn.Linear(1, 1)(gap_layer)
relu_layer = nn.ReLU()(linear_layer_1)
# linear_layer_2 = nn.Linear(gap_layer.shape[1], gap_layer.shape[1])(relu_layer)
linear_layer_2 = nn.Linear(1, 1)(relu_layer)
sigmoid_layer = nn.Sigmoid()(linear_layer_2)
reshape = sigmoid_layer
for _ in range(0, tdcn_pose.shape[2]-1):
    reshape = useful_layers.ConcatLayer(dim=2)(reshape, sigmoid_layer)
# print(reshape.shape)
# print(tdcn_pose.shape)
# print((tdcn_pose*reshape).shape)
flatten = nn.Flatten()(tdcn_pose*reshape)
FC_l1 = nn.Linear(flatten.shape[1], 16)(flatten)(nn.ReLU())
FC_l2 = nn.Linear(FC_l1.shape[1], 12)(FC_l1)(nn.ReLU())
FC_l3 = nn.Linear(FC_l2.shape[1], 8)(FC_l2)(nn.ReLU())
last_layer = nn.Linear(FC_l3.shape[1], 2)(FC_l3)(nn.ReLU())
output = nn.Softmax(1)(last_layer)
print(FC_l1.shape)

torch.Size([1, 16])


In [41]:
device = "cpu"

model = SymbolicModel(inputs=input_pose, outputs=output).to(device)
model.summary()

___________________________________________________________________
       Layer                 Output shape        Params   Parent   
1      Input_1               (None, 6, 5000)     0                 
2      Conv1d_1              (None, 128, 5000)   2432     1        
3      Conv1d_2              (None, 128, 5000)   2432     1        
4      AddOpLayer_1          (None, 128, 5000)   0        2,3      
5      ELU_1                 (None, 128, 5000)   0        4        
6      Conv1d_3              (None, 128, 5000)   82048    5        
7      Conv1d_4              (None, 128, 5000)   82048    5        
8      AddOpLayer_2          (None, 128, 5000)   0        6,7      
9      ELU_2                 (None, 128, 5000)   0        8        
10     Conv1d_5              (None, 128, 5000)   147584   9        
11     Conv1d_6              (None, 128, 5000)   147584   9        
12     AddOpLayer_3          (None, 128, 5000)   0        10,11    
13     ELU_3                 (None, 128, 5000)  