# Posture Detection with Baseline Model ( SVM, RF, NN)


## import Data

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import pandas as pd
import numpy as np
from pprint import pprint 
#---Preprocess-------
data_path = r"/content/drive/MyDrive/Data_chair/position_arakawa_lab_labeled.csv"

#read data in 
raw_data= pd.read_csv(data_path) 



## Preprocessing

Normalization Function 

In [None]:
def norm_funct(d_frame):
  columns_key = d_frame.columns.values
  df_mean =  d_frame[columns_key].mean()
  df_std =  d_frame[columns_key].std().replace(0,1)
  df_norm = (d_frame[columns_key]-df_mean)/df_std
  return df_norm 

# test
import numpy as np
import pandas as pd

d1 = {'data1':np.zeros([5]),'data2':np.ones([5]),'data3':np.array([i for i in range(5)]),'data4':np.array([i for i in range(5,10)])}
df_raw = pd.DataFrame(d1)
df_1 = df_raw.copy()
df_1 = norm_funct(df_1)


In [None]:
print(df_raw.to_markdown())
print(df_1)


Create the new 56 features.

These features are the difference between each sensors' pressure values.
By the theory of combination, selecting 2 sensors from 8 of them on backrest and seat pad gives us 2 x4 x 7 = 56 

In [None]:
# Method for creating new features

def create_feature(data_set):
    # seperate hips sensors from back senors
    hip_keys= data_set.columns.values[:8]
    #back_keys= data_set.columns.values[8:-2]
    back_keys= data_set.columns.values[8:] # this line is for adding normalization

    # calculation on hip data
    sets_keys = hip_keys
    for j in range(len(sets_keys)):
      for i in range(j+1,len(sets_keys)) :
        data_set[sets_keys[j]+'-'+sets_keys[i]]= data_set[sets_keys[j]]- data_set[sets_keys[i]]
    
    sets_keys = back_keys
    for j in range(len(sets_keys)):
      for i in range(j+1,len(sets_keys)) :
        data_set[sets_keys[j]+'-'+sets_keys[i]]= data_set[sets_keys[j]]- data_set[sets_keys[i]]
    
    
    print(f'The total number now is :{len(data_set.columns.values[:])}')   
    print(f'The keys created now are :{data_set.columns.values[:]}')     
    return data_set  
  

    

In [None]:
#test code
mod_data = raw_data.copy()
mod_data = create_feature(mod_data)

Create the new 120 features.

These features are the difference between each sensors' pressure values.

In [None]:
def create_feature_120(data_set):
    #sets_keys = data_set.columns.values[:-2]
    sets_keys = data_set.columns.values[:] # for adding normalization

    for j in range(len(sets_keys)):
      for i in range(j+1,len(sets_keys)) :
        data_set[sets_keys[j]+'-'+sets_keys[i]]= data_set[sets_keys[j]]- data_set[sets_keys[i]]
    print(f'The total number now is :{len(data_set.columns.values[:])}')    
    print(f'The keys created now are :{data_set.columns.values[:]}')     
     
    return data_set  
  

In [None]:
#test code
mod_data = raw_data.copy()
mod_data = create_feature_120(mod_data)

The total number now is :138
The keys created now are :['hip1' 'hip2' 'hip3' 'hip4' 'hip5' 'hip6' 'hip7' 'hip8' 'back1' 'back2'
 'back3' 'back4' 'back5' 'back6' 'back7' 'back8' 'label' 'personID'
 'hip1-hip2' 'hip1-hip3' 'hip1-hip4' 'hip1-hip5' 'hip1-hip6' 'hip1-hip7'
 'hip1-hip8' 'hip1-back1' 'hip1-back2' 'hip1-back3' 'hip1-back4'
 'hip1-back5' 'hip1-back6' 'hip1-back7' 'hip1-back8' 'hip2-hip3'
 'hip2-hip4' 'hip2-hip5' 'hip2-hip6' 'hip2-hip7' 'hip2-hip8' 'hip2-back1'
 'hip2-back2' 'hip2-back3' 'hip2-back4' 'hip2-back5' 'hip2-back6'
 'hip2-back7' 'hip2-back8' 'hip3-hip4' 'hip3-hip5' 'hip3-hip6' 'hip3-hip7'
 'hip3-hip8' 'hip3-back1' 'hip3-back2' 'hip3-back3' 'hip3-back4'
 'hip3-back5' 'hip3-back6' 'hip3-back7' 'hip3-back8' 'hip4-hip5'
 'hip4-hip6' 'hip4-hip7' 'hip4-hip8' 'hip4-back1' 'hip4-back2'
 'hip4-back3' 'hip4-back4' 'hip4-back5' 'hip4-back6' 'hip4-back7'
 'hip4-back8' 'hip5-hip6' 'hip5-hip7' 'hip5-hip8' 'hip5-back1'
 'hip5-back2' 'hip5-back3' 'hip5-back4' 'hip5-back5' 'hip5-back6

Split data

In [None]:
def preprocess_data(test_id,raw_set,make_feature='raw',norm =False):
  # split data to train and test group based on PersonIDo
  print(f'Spliting data {test_id} from raw_data.... ')

  mod_data = raw_set.copy()
  # try to drop some feature which seems to be unstable  
  grouped_id = mod_data.groupby("personID")

  # list to store personid for training in this round
  train_id = [i for i in range(1,11)]
  train_id.remove(test_id)
  train_df = []
  test_df = [] 
  print(f'training set this round includes {train_id}  ')
  
  # concat the rest of data as train data
  for i in train_id: 
    train_df.append(grouped_id.get_group(i)) 

  test_df.append(grouped_id.get_group(test_id)) 
  # back to dataframe
  train_df= pd.concat(train_df)
  test_df= pd.concat(test_df)



  #Shuffle and concat train data
  train_f=train_df.sample(frac=1).reset_index(drop=True)
  #Shuffle test data
  test_f=test_df.sample(frac=1).reset_index(drop=True)


  # seperate label from raw data => label process finished
  train_y = train_f[["label"]].to_numpy().reshape([-1]) 
  test_y = test_f[["label"]].to_numpy().reshape([-1])
  
  #-----Training data preprocess------  
  train_x = train_f.drop(columns=["label","personID"])
  test_x = test_f.drop(columns=["label","personID"])

  #normalize if norm = True
  if (norm):
    train_x=norm_funct(train_x)
    test_x=norm_funct(test_x)

  if(make_feature=='fifty'):
    train_x = create_feature(train_x)
    test_x = create_feature(test_x)
  elif(make_feature=='hundred'):
    train_x = create_feature_120(train_x)
    test_x = create_feature_120(test_x)
 
 # Transfer training data to numpy
  train_x = train_x.to_numpy()
  test_x = test_x.to_numpy()

  return train_x ,train_y, test_x,test_y

In [None]:
#test code
train_x ,train_y, test_x,test_y = preprocess_data(7,raw_data,make_feature='raw',norm=True)

Spliting data 7 from raw_data.... 
training set this round includes [1, 2, 3, 4, 5, 6, 8, 9, 10]  


# Main

## SVM

In [None]:
import pandas as pd
import numpy as np
from pprint import pprint
from sklearn.metrics import ConfusionMatrixDisplay
from sklearn import metrics
import matplotlib.pyplot as plt
from sklearn.svm import SVC

def evaluate(model, test_x, test_y):
    predictions = model.predict(test_x)
    report = metrics.classification_report(test_y, predictions,output_dict=True,digits=3)
    print('Model Performance')
    print(report)
    return report
#---Preprocess-------
data_path = r"/content/drive/MyDrive/Data_chair/position_arakawa_lab_labeled.csv"

#read data in 
raw_data= pd.read_csv(data_path)
recall =[]
precision=[]
accuracy=[]
f1_score=[]
#variable for ploting
fig, axes = plt.subplots(4, 3,figsize=(50,50))
plot_x=0
plot_y=0

# Validation approach:　Leave one person out 
# Id i out 
for i in range(1,11):
  # split_data(test_id, data, create feature method) methods can be 'raw''fifty' 'hundred'
  train_x ,train_y, test_x,test_y = preprocess_data(i,raw_data,'fifty',norm=True)
  # instantiate model
  base_model = SVC(kernel='rbf', random_state = 42)
  base_model.fit(train_x,train_y)
  # evaluate
  base_report = evaluate(base_model, test_x, test_y)
  recall.append(base_report['macro avg']['recall'])
  precision.append(base_report['macro avg']['precision'])
  f1_score.append(base_report['macro avg']['f1-score'])
  accuracy.append(base_report['accuracy'])

  print(base_report['accuracy'])
  print(base_report['macro avg'])

  # testing
  x_pred = base_model.predict(test_x)
  x_pred = base_model.predict(test_x)
  # display
  ConfusionMatrixDisplay.from_predictions(test_y, x_pred,ax=axes[plot_y][plot_x] )
  plot_x =plot_x+1
  if(plot_x>2):
      plot_x=0
      plot_y=plot_y+1
plt.show()

In [None]:

print(len(accuracy))
acc_arr=np.array(accuracy)
print(np.round(acc_arr,decimals=4))
print(np.round(np.array(recall).mean(),decimals=4))
print(np.round(np.array(precision).mean(),decimals=4))
print(np.round(np.array(f1_score).mean(),decimals=4))
print(np.round(acc_arr.mean(),decimals=4))


## RF

In [None]:
import pandas as pd
import numpy as np
from pprint import pprint
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import ConfusionMatrixDisplay
from sklearn import metrics
import matplotlib.pyplot as plt

def evaluate(model, test_x, test_y):
    predictions = model.predict(test_x)
    report = metrics.classification_report(test_y, predictions,output_dict=True,digits=3)
    print('Model Performance')
    print(report)
    return report
#---Preprocess-------
data_path = r"/content/drive/MyDrive/Data_chair/position_arakawa_lab_labeled.csv"

#read data in 
raw_data= pd.read_csv(data_path)
recall =[]
precision=[]
accuracy=[]
f1_score=[]
#variable for ploting
fig, axes = plt.subplots(4, 3,figsize=(50,50))
plot_x=0
plot_y=0

#Leave one person id i out 
for i in range(1,11):
  
  train_x ,train_y, test_x,test_y = preprocess_data(i,raw_data,'fifty')
  base_model = RandomForestClassifier(max_depth= 24, n_estimators= 50, random_state = 42)
  base_model.fit(train_x, train_y)
  base_report = evaluate(base_model, test_x, test_y)
  recall.append(base_report['macro avg']['recall'])
  precision.append(base_report['macro avg']['precision'])
  f1_score.append(base_report['macro avg']['f1-score'])
  accuracy.append(base_report['accuracy'])
  print(base_report['accuracy'])
  print(base_report['macro avg'])

  # Plot non-normalized confusion matrix

  
  x_pred = base_model.predict(test_x)
  x_pred = base_model.predict(test_x)
  ConfusionMatrixDisplay.from_predictions(test_y, x_pred,ax=axes[plot_y][plot_x] )
  plot_x =plot_x+1
  if(plot_x>2):
      plot_x=0
      plot_y=plot_y+1
plt.show()

In [None]:

print(len(accuracy))
acc_arr=np.array(accuracy)
print(np.round(acc_arr,decimals=4))
print(np.round(np.array(recall).mean(),decimals=4))
print(np.round(np.array(precision).mean(),decimals=4))
print(np.round(np.array(f1_score).mean(),decimals=4))
print(np.round(acc_arr.mean(),decimals=4))

## KNN

In [None]:
import pandas as pd
import numpy as np
from pprint import pprint
from sklearn.metrics import ConfusionMatrixDisplay
from sklearn import metrics
import matplotlib.pyplot as plt
from sklearn.neighbors import KNeighborsClassifier

def evaluate(model, test_x, test_y):
    predictions = model.predict(test_x)
    report = metrics.classification_report(test_y, predictions,output_dict=True,digits=3)
    print('Model Performance')
    print(report)
    return report
#---Preprocess-------
data_path = r"/content/drive/MyDrive/Data_chair/position_arakawa_lab_labeled.csv"

#read data in 
raw_data= pd.read_csv(data_path)
recall =[]
precision=[]
accuracy=[]
f1_score=[]
#variable for ploting
fig, axes = plt.subplots(4, 3,figsize=(50,50))
plot_x=0
plot_y=0

#Leave one person id i out 
for i in range(1,11):
  
  train_x ,train_y, test_x,test_y = preprocess_data(i,raw_data,'fifty',norm=True)
  base_model = KNeighborsClassifier(n_neighbors=12)
  base_model.fit(train_x,train_y)
  base_report = evaluate(base_model, test_x, test_y)
  recall.append(base_report['macro avg']['recall'])
  precision.append(base_report['macro avg']['precision'])
  f1_score.append(base_report['macro avg']['f1-score'])
  accuracy.append(base_report['accuracy'])
  print(base_report['accuracy'])
  print(base_report['macro avg'])

  # Plot non-normalized confusion matrix

  
  x_pred = base_model.predict(test_x)
  x_pred = base_model.predict(test_x)
  ConfusionMatrixDisplay.from_predictions(test_y, x_pred,ax=axes[plot_y][plot_x] )
  plot_x =plot_x+1
  if(plot_x>2):
      plot_x=0
      plot_y=plot_y+1
plt.show()

In [None]:

print(len(accuracy))
acc_arr=np.array(accuracy)
print(np.round(acc_arr,decimals=4))
print(np.round(np.array(recall).mean(),decimals=4))
print(np.round(np.array(precision).mean(),decimals=4))
print(np.round(np.array(f1_score).mean(),decimals=4))
print(np.round(acc_arr.mean(),decimals=4))


## NN trial

Cross Validation

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

import matplotlib.pyplot as plt
from sklearn.metrics import ConfusionMatrixDisplay
from sklearn.metrics import f1_score
from sklearn.metrics import precision_recall_fscore_support


class ChairDataset(Dataset):
    def __init__(self, X, y=None):
        self.data = torch.from_numpy(X).float()
        if y is not None:
            y = y.astype(np.int)
            self.label = torch.LongTensor(y)
        else:
            self.label = None

    def __getitem__(self, idx):
        if self.label is not None:
            return self.data[idx], self.label[idx]
        else:
            return self.data[idx]

    def __len__(self):
        return len(self.data)
        
class Classifier(nn.Module):
    def __init__(self):
        super(Classifier, self).__init__()
        self.layer1 = nn.Linear(72, 36)
        self.dropout1= nn.Dropout(0.5)
        self.layer2 = nn.Linear(36, 12)
        self.act_fn = nn.ReLU()
        self.out = nn.Softmax()

    def forward(self, x):
        x = self.layer1(x)
        x= self.dropout1(x)
        x = self.act_fn(x)

        x = self.layer2(x)
        x = self.act_fn(x)

        x = self.out(x)
        
        return x
#----training function -----		
#check device
def get_device():
  return 'cuda' if torch.cuda.is_available() else 'cpu'

# fix random seed
def same_seeds(seed):
    torch.manual_seed(seed) #set generator's seed on cpu
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed) #set generator's seed on gpu
        torch.cuda.manual_seed_all(seed)  #set generator's seed on all gpu device
    np.random.seed(seed)  
    torch.backends.cudnn.benchmark = False # if this is set to true, torh may use different convolution algorithm backend which has faster speed=> irreproducible result.
    torch.backends.cudnn.deterministic = True #only use deterministic convolution algorithms.
#----end training-----



if __name__=='__main__':

	data_path = r"/content/drive/MyDrive/Data_chair/position_arakawa_lab_labeled.csv"

	#read in data 
	raw_data= pd.read_csv(data_path)

	#variable for ploting
	fig, axes = plt.subplots(4, 3,figsize=(50,50))
	plot_x=0
	plot_y=0


  

  #---Cross Validate------
	acc_list=[]
	report_list = []
	for cross_id in range(1,11):
		BATCH_SIZE = 10

		train_x ,train_y, test_x,test_y = preprocess_data(cross_id,raw_data,make_feature='fifty',norm=True)
		# adjust value to suit cross entorpy
		train_y= train_y-1
		test_y= test_y-1

		train_set = ChairDataset(train_x, train_y)
		test_set = ChairDataset(test_x, test_y)
		train_loader = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=False ) #only shuffle the training data
		test_loader = DataLoader(test_set, batch_size=BATCH_SIZE, shuffle=False)	
		#----training setting-----
		# fix random seed for reproducibility
		same_seeds(0)

		# get device 
		device = get_device()
		#device = 'cpu'
		print(f'DEVICE: {device}')

		# training parameters
		num_epoch = 110          # number of training epoch
		learning_rate = 0.0001       # learning rate

		# the path where checkpoint saved
		model_path = './model.ckpt'

		# create model, define a loss function, and optimizer
		model = Classifier().to(device)
		criterion = nn.CrossEntropyLoss() 
		optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate,weight_decay=0.0001)

		#----start training -----
		# start training

		best_acc = 0.0
		for epoch in range(num_epoch):
			train_acc = 0.0
			train_loss = 0.0
			val_acc = 0.0
			val_loss = 0.0

			# training
			model.train() # set the model to training mode
			for i, data in enumerate(train_loader):
				inputs, labels = data
				inputs, labels = inputs.to(device), labels.to(device)
				optimizer.zero_grad() 
				outputs = model(inputs) 
				batch_loss = criterion(outputs, labels)
				_, train_pred = torch.max(outputs, 1) # get the index of the class with the highest probability
				batch_loss.backward() 
				optimizer.step() 

				train_acc += (train_pred.cpu() == labels.cpu()).sum().item()
				train_loss += batch_loss.item()

			# validation
			if len(test_set) > 0:
				model.eval() # set the model to evaluation mode
				with torch.no_grad():
					for i, data in enumerate(test_loader):
						inputs, labels = data
						inputs, labels = inputs.to(device), labels.to(device)
						outputs = model(inputs)
						batch_loss = criterion(outputs, labels) 
						_, val_pred = torch.max(outputs, 1) 
					
						val_acc += (val_pred.cpu() == labels.cpu()).sum().item() # get the index of the class with the highest probability
						val_loss += batch_loss.item()

					print('[{:03d}/{:03d}] Train Acc: {:3.6f} Loss: {:3.6f} | Val Acc: {:3.6f} loss: {:3.6f}'.format(
						epoch + 1, num_epoch, train_acc/len(train_set), train_loss/len(train_loader), val_acc/len(test_set), val_loss/len(test_loader)
					))

					# if the model improves, save a checkpoint at this epoch
					if val_acc > best_acc:
						best_acc = val_acc
						torch.save(model.state_dict(), model_path)
						print('saving model with acc {:.3f}'.format(best_acc/len(test_set)))
			else:
				print('[{:03d}/{:03d}] Train Acc: {:3.6f} Loss: {:3.6f}'.format(
					epoch + 1, num_epoch, train_acc/len(train_set), train_loss/len(train_loader)
				))

		# if not validating, save the last epoch
		if len(test_set) == 0:
			acc_list.append(best_acc/len(test_set))
			torch.save(model.state_dict(), model_path)
			print('saving model at last epoch')
		
		#---Evaluate-----
		# create model and load weights from checkpoint
		model = Classifier().to(device)
		model.load_state_dict(torch.load(model_path))
		predict = []
		model.eval() # set the model to evaluation mode
		with torch.no_grad():
			for i, data in enumerate(test_loader):
				inputs, labels= data
				inputs = inputs.to(device)
				outputs = model(inputs)
				_, test_pred = torch.max(outputs, 1) # get the index of the class with the highest probability

				for y in test_pred.cpu().numpy():
					predict.append(y)
		correct = (torch.tensor(predict) ==torch.tensor(test_y)).sum()
		acc = correct/test_x.shape[0]	
		acc_list.append(round(float(acc),2))
		report_list.append(precision_recall_fscore_support(torch.tensor(test_y),torch.tensor(predict),average='macro'))
		ConfusionMatrixDisplay.from_predictions(test_y, predict,ax=axes[plot_y][plot_x] )
  
		plot_x =plot_x+1
		if(plot_x>2):
			plot_x=0
			plot_y=plot_y+1
		
		
	#---- end for loop----
	plt.show()	
	
	print(acc_list)
	

In [None]:
acc_arr= np.array(acc_list).mean(axis=0)
report_arr= np.array(report_list)[:,:-1].mean(axis=0)
print(np.round(acc_arr.astype(float),decimals=4))
print(np.round(report_arr.astype(float),decimals=4))


0.851
[0.8146 0.85   0.8143]
