In [329]:
import os
import time
import torch
import pandas as pd
import numpy as np
from torch import nn
from torch.utils.data import Dataset, DataLoader
import torchvision
from torchvision import datasets, transforms
from torchvision.transforms import ToTensor
import torchvision.transforms as T
from torchvision.models.detection import keypointrcnn_resnet50_fpn, KeypointRCNN_ResNet50_FPN_Weights
from collections import OrderedDict
import matplotlib.pyplot as plt
import zipfile
from torchvision.io import read_image
from PIL import Image
import math
import random
from torch.nn.utils.rnn import pad_sequence

from utils import *


# Ignore warnings
# import warnings
# warnings.filterwarnings("ignore")


In [330]:
class URFallDataset(Dataset):
    """ a sample of the dataset will be a selection of frames and name of zipfile"""
    
    def __init__(self, root_dir, folders, transform=None, sampling=False, sample_len=50):
        self.root_dir = root_dir
        self.transform = transform
        self.sequences = folders
        self.sampling = sampling
        self.sample_len = sample_len

    def __len__(self):
        return len(self.sequences)
    
    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        png_list = []
        keypoint_list = []

        path_name = os.path.join(self.root_dir, self.sequences[idx])
        inf_list = os.listdir(path_name)
        #print(inf_list)

        if self.sampling: #indexing not great
            interval = len(inf_list) // self.sample_len
            #print(self.sequences[idx], "total", len(inf_list), "interval", interval)
            start = len(inf_list) - interval * self.sample_len
            for i in range(start, len(inf_list)):
                if i % interval == 0:
                    png = Image.open(path_name + "/" + inf_list[i])
                    png_t = self.transform(png)
                    png_list += [self.transform(png)]

                    # outputs = keypoint_rcnn(k_transforms(png))
                    # keypoints = outputs['keypoints']
                    # print(keypoints.shape())
                    # keypoint_list += [keypoints]

        #print("png len", len(png_list))
        print(torch.cat(png_list, 0).shape)

        # "fall" or "adl-"
        fall_b = [1, 0] if self.sequences[idx][:4] == 'fall' else [0, 1]
        sample = {"img_list": torch.cat(png_list, 0), "category": torch.tensor(fall_b)}
        return sample


In [331]:
class NeuralNetwork(nn.Module):
    def __init__(self, size, numLayer=1, numNeuron=[1024]):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(size, 512),
            nn.ReLU(),
            nn.Linear(512, 2),
        )

    def forward(self, x):
        print('imagetensor: {}'.format(x.shape))
        x = self.flatten(x)
        # x = x.to(torch.float64)
        logits = self.linear_relu_stack(x)
        return logits


In [332]:
def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, sample in enumerate(dataloader): #?
        target = sample['category']
        img = sample['img_list']
        # print(target, img)
        print("batch", batch)
        # print("istensorimg", torch.is_tensor(img))
        # print("img tensor shape", img.shape, "target shape", target.shape)
        img = img.to(torch.float32)
        # print(img)
        pred = model(img)
        pred_probab = nn.Softmax(dim=1)(pred)
        # print("pred", pred_probab, pred)
        loss = loss_fn(pred_probab, target.to(torch.float16))

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(img)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [333]:
def test_loop(dataloader, model, loss_fn):
    model.eval()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, tp, correct = 0, 0, 0
    false_negative = 0
    predicted_pos = 0

    def is_fall(a):
        return torch.argmax(pred) == 0

    with torch.no_grad():
        # Evaluating the model with torch.no_grad() ensures that no gradients are computed during test mode
        for sample in dataloader:
            img = sample['img_list']
            target = sample['category']
            img = img.to(torch.float32)
            pred = model(img)
            pred_probab = nn.Softmax(dim=1)(pred)
            print("pred", pred, pred_probab, torch.argmax(pred))

            test_loss += loss_fn(pred_probab, target.to(torch.float16)).item()
            tp += 1 if is_fall(pred_probab) and is_fall(target) else 0
            correct += 1 if is_fall(pred_probab) == is_fall(target) else 0
            #(torch.argmax(pred, 1) == target).type(torch.float).sum().item() 
            predicted_pos += 1 if is_fall(pred_probab) else 0
            false_negative += 1 if not is_fall(pred_probab) and is_fall(target) else 0

    test_loss /= num_batches
    accuracy = correct / size
    precision = tp/predicted_pos if predicted_pos > 0 else 0
    recall = tp/(tp+false_negative) if tp > 0  else 0
    f1 = 2*precision*recall/(precision+recall) if tp > 0  else 0
    print(f"Test Error: \n Accuracy: {(100*tp/size):>0.1f}%, Avg loss: {test_loss:>8f}, Recall: {(100*recall):>0.1f}%, F1: {(100*f1):>0.1f}% \n")
    return accuracy, recall, f1

In [334]:
device = ("cpu")
keypoint_rcnn = get_Krcnn()

root_dir="/Users/dy/Documents/ODML/urf-rgb"
folders = os.listdir(root_dir)
if ".DS_Store" in folders:
    folders.remove(".DS_Store")
random.shuffle(folders)
print(len(folders))
train_size = int(len(folders)*0.7)

train_seq = folders[:train_size]
test_seq = folders[train_size:]

sample_len = 10
img_size = 50
transform = transforms.Compose([transforms.PILToTensor(), 
                                transforms.Resize(size=[img_size,img_size])])

urf_train = URFallDataset(root_dir, folders = train_seq, transform=transform, sampling=True, sample_len=sample_len)
urf_test = URFallDataset(root_dir, folders = test_seq, transform=transform, sampling=True, sample_len=sample_len)

train_dataloader = DataLoader(urf_train, batch_size=1, shuffle=True, num_workers=0)
test_dataloader = DataLoader(urf_test, batch_size=1, shuffle=True, num_workers=0)

print(len(urf_train), len(urf_test))

total_pos = 0
for i, sample in list(enumerate(urf_test)):
    if torch.eq(sample["category"], torch.tensor([1,0]))[0]:
        total_pos += 1
    print("sampling", i)

print(total_pos, len(test_seq), "fall rate in test dset", total_pos/len(test_seq))



70
49 21




torch.Size([30, 50, 50])
torch.Size([30, 50, 50])
torch.Size([30, 50, 50])
torch.Size([30, 50, 50])
torch.Size([30, 50, 50])
torch.Size([30, 50, 50])
torch.Size([30, 50, 50])
torch.Size([30, 50, 50])
torch.Size([30, 50, 50])
torch.Size([30, 50, 50])
torch.Size([30, 50, 50])
torch.Size([30, 50, 50])
torch.Size([30, 50, 50])
torch.Size([30, 50, 50])
torch.Size([30, 50, 50])
torch.Size([30, 50, 50])
torch.Size([30, 50, 50])
torch.Size([30, 50, 50])
torch.Size([30, 50, 50])
torch.Size([30, 50, 50])
torch.Size([30, 50, 50])
sampling 0
sampling 1
sampling 2
sampling 3
sampling 4
sampling 5
sampling 6
sampling 7
sampling 8
sampling 9
sampling 10
sampling 11
sampling 12
sampling 13
sampling 14
sampling 15
sampling 16
sampling 17
sampling 18
sampling 19
sampling 20
5 21 fall rate in test dset 0.23809523809523808


In [335]:
size = sample_len * img_size * img_size * 3 
print(size)

model = NeuralNetwork(size=size, numLayer=2, numNeuron=1000).to(device)
print(model)
print("model flop count", flop_count(model))

flops = []
accuracys = []
recalls = []
f1s = []
latency = []

learning_rate = 1e-3
epochs = 2

# Initialize the loss function and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    start_t = time.time()
    train_loop(train_dataloader, model, loss_fn, optimizer)
    print("training used", time.time() - start_t, "s")
    
    start_t = time.time()
    (accuracy, recall, f1) = test_loop(test_dataloader, model, loss_fn)
    print("inference latency", (time.time() - start_t) * 1000 / len(urf_test), "ms")

    #calculate metrics ##
    flops += [flop_count(model)]
    accuracys += [accuracy]
    recalls += [recall]
    f1s += [f1]
    latency += [(time.time() - start_t)/10]

print(sample_len, img_size)
print("flops=", flops, "\n", "accuracy, recall, f1=", accuracys, recalls, f1s, "\n", "latency=", latency)

75000


NeuralNetwork(
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (linear_relu_stack): Sequential(
    (0): Linear(in_features=75000, out_features=512, bias=True)
    (1): ReLU()
    (2): Linear(in_features=512, out_features=2, bias=True)
  )
)
this layer: 76799488
this layer: 2046
model flop count 76801534
Epoch 1
-------------------------------
torch.Size([30, 50, 50])
batch 0
imagetensor: torch.Size([1, 30, 50, 50])
loss: 0.989787  [    1/   49]
torch.Size([30, 50, 50])
batch 1
imagetensor: torch.Size([1, 30, 50, 50])
torch.Size([30, 50, 50])
batch 2
imagetensor: torch.Size([1, 30, 50, 50])
torch.Size([30, 50, 50])
batch 3
imagetensor: torch.Size([1, 30, 50, 50])
torch.Size([30, 50, 50])
batch 4
imagetensor: torch.Size([1, 30, 50, 50])
torch.Size([30, 50, 50])
batch 5
imagetensor: torch.Size([1, 30, 50, 50])
torch.Size([30, 50, 50])
batch 6
imagetensor: torch.Size([1, 30, 50, 50])
torch.Size([30, 50, 50])
batch 7
imagetensor: torch.Size([1, 30, 50, 50])
torch.Size([30, 50, 50])
batch 8