<a href="https://colab.research.google.com/github/Yunika-Bajracharya/TeamPI/blob/main/team_pi.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
import random
root = "/content/drive/MyDrive/dataset"
folders = os.walk(root)
list_of_folders = next(folders)[1]
list_of_folders.sort()

train = []
valid = []

for folder in list_of_folders:
  folder_path = os.path.join(root, folder)
  subfolders = next(os.walk(folder_path))[1]
  
  for subfolder in subfolders:
    subfolder_path = (os.path.join(folder_path, subfolder))
    subfolders = next(os.walk(folder_path))[1]

    files = [f for f in os.listdir(subfolder_path) if os.path.isfile(os.path.join(subfolder_path, f))]

    for file_name in files:
      if random.uniform(0, 1) < 0.8:
        train.append([os.path.join(subfolder_path, file_name), 1 if subfolder == "fall" else 0])
      else:
        valid.append([os.path.join(subfolder_path, file_name), 1 if subfolder == "fall" else 0])

# 80 - 20 split to training & validation data

In [2]:
import pandas as pd

train_df = pd.DataFrame(train, columns=['path', 'fall'])
valid_df = pd.DataFrame(valid, columns=['path', 'fall'])
print(train_df)

                                                  path  fall
0    /content/drive/MyDrive/dataset/Anju/notFall/da...     0
1    /content/drive/MyDrive/dataset/Anju/notFall/da...     0
2    /content/drive/MyDrive/dataset/Anju/notFall/da...     0
3    /content/drive/MyDrive/dataset/Anju/notFall/da...     0
4    /content/drive/MyDrive/dataset/Anju/notFall/da...     0
..                                                 ...   ...
96   /content/drive/MyDrive/dataset/dhirendraBhai/n...     0
97   /content/drive/MyDrive/dataset/dhirendraBhai/n...     0
98   /content/drive/MyDrive/dataset/dhirendraBhai/n...     0
99   /content/drive/MyDrive/dataset/dhirendraBhai/n...     0
100  /content/drive/MyDrive/dataset/dhirendraBhai/n...     0

[101 rows x 2 columns]


In [3]:
train_df.fall.value_counts()

1    52
0    49
Name: fall, dtype: int64

In [4]:
valid_df.fall.value_counts()

0    10
1     7
Name: fall, dtype: int64

In [5]:
shapes = []
import torch

for i in range(len(train_df)):
  csv_path = train_df.iloc[i, 0]
  mat = pd.read_csv(csv_path)
  mat = [list(mat[x]) for x in ['ax', 'ay', 'az', 'gx', 'gy', 'gz']]
  mat = torch.tensor(mat)
  if mat.shape[1] < 10:
    print(csv_path)
  shapes.append(mat.shape[1])

print(shapes)
mat_min = min(shapes) # csv to tensor

[126, 120, 121, 120, 119, 131, 108, 127, 131, 128, 129, 127, 132, 138, 134, 132, 138, 139, 144, 133, 136, 133, 140, 137, 134, 136, 134, 136, 135, 136, 135, 128, 130, 129, 141, 138, 132, 143, 92, 137, 137, 141, 142, 138, 135, 144, 139, 121, 123, 94, 120, 120, 128, 122, 119, 128, 130, 126, 128, 128, 133, 131, 88, 132, 134, 133, 142, 134, 137, 134, 121, 129, 133, 132, 134, 134, 135, 132, 135, 133, 133, 136, 127, 127, 129, 128, 130, 129, 129, 131, 132, 128, 136, 138, 134, 138, 139, 136, 137, 138, 136]


In [6]:
import os
import torch
import pandas as pd
from torch.utils.data import Dataset, IterableDataset

def mat_to_tensor(mat_path):
  mat = pd.read_csv(mat_path)
  mat = mat.dropna()
  mat = [list(mat[x]) for x in ['ax', 'ay', 'az', 'gx', 'gy', 'gz']]
  mat = torch.tensor(mat)
  return mat

class IMUSet(Dataset):
    def __init__(self, data_df):
        self.data_df = data_df # data_df == train_df / valid_df

    def __len__(self):
        return len(self.data_df)

    def __getitem__(self, idx):
        mat_path = self.data_df.iloc[idx, 0]
        t = mat_to_tensor(mat_path)
        label = self.data_df.iloc[idx, 1]
        return t, label

In [7]:
from torch.utils.data import DataLoader

imuset = IMUSet(train_df) # make dataset

train_dataloader = DataLoader(imuset, shuffle=True) # load data multiple 

In [8]:
valid_dataset = IMUSet(valid_df)
valid_dataloader = DataLoader(valid_dataset, shuffle=True)

valid_features, valid_labels = next(iter(valid_dataloader))

valid_features.shape, valid_labels.shape

(torch.Size([1, 6, 130]), torch.Size([1]))

**GRU**

In [9]:
from torch import nn
import torch.nn.functional as F

class FallDetector(nn.Module):
    def __init__(self):
        super(FallDetector, self).__init__()
        self.gru = nn.GRU(6, 30, num_layers=2)
        self.hidden2tag = nn.Linear(30, 1)

    def forward(self, seq):
        output, (h_n, c_n) = self.gru(seq.view(len(seq), 1, -1))
        tag_space = self.hidden2tag(c_n.view(1, -1))
        # print(tag_space.data)
        tag_scores = torch.sigmoid(tag_space) # output changed to 0-1
        return tag_scores.view(-1)

In [10]:
import torch.optim as optim

model = FallDetector()
loss_function = nn.BCELoss() 
optimizer = optim.Adam(model.parameters()) # SGD & RMSprop & Adam : alternatives

In [11]:
import numpy as np

from sklearn.metrics import confusion_matrix, accuracy_score, recall_score

def get_statistics(y_true, y_pred):
    y_pred = np.concatenate(tuple(y_pred)) > 0.5
    y_true = np.concatenate(tuple([[t for t in y] for y in y_true])).reshape(y_pred.shape)

    print("Accuracy: ", accuracy_score(y_true, y_pred))
    print("Recall: ", recall_score(y_true, y_pred))
    print(len(y_true))

    print("Confusion Matrix")
    print(confusion_matrix(y_true, y_pred))

In [12]:

for epoch in range(50): 
    labels = []
    preds = []
    loss = []

    i = 0
    for seq, label in train_dataloader:
        model.zero_grad()

        seq = seq.reshape((-1, 6))
        tag_scores = model(seq.to(torch.float))
        preds.append(tag_scores.data)
        labels.append(label.to(torch.float).data)

        loss = loss_function(tag_scores, label.to(torch.float))
        loss.backward()
        optimizer.step()


        i += 1
    print("Epoch Done")
    get_statistics(labels, preds)

    # tag_scores[tag_scores > 0.4] = 1


Epoch Done
Accuracy:  0.594059405940594
Recall:  0.36538461538461536
101
Confusion Matrix
[[41  8]
 [33 19]]
Epoch Done
Accuracy:  0.7029702970297029
Recall:  0.7307692307692307
101
Confusion Matrix
[[33 16]
 [14 38]]
Epoch Done
Accuracy:  0.7425742574257426
Recall:  0.7115384615384616
101
Confusion Matrix
[[38 11]
 [15 37]]
Epoch Done
Accuracy:  0.8415841584158416
Recall:  0.8076923076923077
101
Confusion Matrix
[[43  6]
 [10 42]]
Epoch Done
Accuracy:  0.8514851485148515
Recall:  0.8846153846153846
101
Confusion Matrix
[[40  9]
 [ 6 46]]
Epoch Done
Accuracy:  0.8316831683168316
Recall:  0.75
101
Confusion Matrix
[[45  4]
 [13 39]]
Epoch Done
Accuracy:  0.8613861386138614
Recall:  0.8846153846153846
101
Confusion Matrix
[[41  8]
 [ 6 46]]
Epoch Done
Accuracy:  0.8613861386138614
Recall:  0.9038461538461539
101
Confusion Matrix
[[40  9]
 [ 5 47]]
Epoch Done
Accuracy:  0.8613861386138614
Recall:  0.8269230769230769
101
Confusion Matrix
[[44  5]
 [ 9 43]]
Epoch Done
Accuracy:  0.861386138

In [13]:

labels = []
preds = []
loss = []
model.eval()

for seq, label in valid_dataloader:

    seq = seq.reshape((-1, 6))
    tag_scores = model(seq.to(torch.float))
    preds.append(tag_scores.data)
    labels.append(label.to(torch.float).data)

print("Epoch Done")
get_statistics(labels, preds)

Epoch Done
Accuracy:  0.8823529411764706
Recall:  0.8571428571428571
17
Confusion Matrix
[[9 1]
 [1 6]]


In [18]:
torch.save(model.state_dict(), 'model_weights.pt')

In [19]:
from torch import nn
import torch.nn.functional as F

class FallDetector(nn.Module):
    def __init__(self):
        super(FallDetector, self).__init__()
        self.gru = nn.GRU(6, 30, num_layers=2)
        self.hidden2tag = nn.Linear(30, 1)

    def forward(self, seq):
        output, (h_n, c_n) = self.gru(seq.view(len(seq), 1, -1))
        tag_space = self.hidden2tag(c_n.view(1, -1))
        # print(tag_space.data)
        tag_scores = torch.sigmoid(tag_space) # output changed to 0-1
        return tag_scores.view(-1)

model = FallDetector()
model.load_state_dict(torch.load('model_weights.pt'))
model.eval()

FallDetector(
  (gru): GRU(6, 30, num_layers=2)
  (hidden2tag): Linear(in_features=30, out_features=1, bias=True)
)

In [14]:
def fall_detected(labels):
  fall_start = False
  fall_detected = False
  count = 0
  for label in labels:
    if label == 1:
      if not fall_start:
        fall_start = True
        count = 1
      else:
        count += 1
    else:
      fall_start = False
      count = 0
    if count >= 10:
      fall_detected = True
      break
  return fall_detected

# labels = [0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0]
print(fall_detected(labels))

False
