In [None]:
from torch import nn
from ptcn import TemporalConvNet
import torch.nn.functional as F
from torch.autograd import Variable
import torch.optim as optim
import torch
import numpy as np
import pickle
import LogHistory
import pandas as pd
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from sklearn import preprocessing
from torch.utils.data import DataLoader, TensorDataset
from keras.layers import Bidirectional
import pytorch_lightning as pl
import keras
from keras.callbacks import ReduceLROnPlateau, CSVLogger, EarlyStopping
from torchsummary import summary
from sklearn.metrics import accuracy_score
from sklearn.metrics import average_precision_score
from sklearn.metrics import f1_score,recall_score
from sklearn.svm import SVC,LinearSVC
import matplotlib.pyplot as plt
from matplotlib import mlab
import matplotlib.cm as cm
from pytorch_lightning import Trainer
import presnet

In [None]:
output = open("D:/罗飞/kitchen_processed_data/"+'xall.p', 'rb')
xall=pd.read_pickle(output,compression=None)
output = open("D:/罗飞/kitchen_processed_data/"+'yall.p', 'rb')
yall=pd.read_pickle(output,compression=None)
output.close()

In [None]:
le = preprocessing.LabelEncoder()
yall=le.fit_transform(yall)
num_classes=len(np.unique(yall))
print(np.unique(yall),num_classes)
xall, yall = shuffle(xall,yall)
print(yall[0:15])
x_train, x_test, y_train, y_test =train_test_split(xall, yall, test_size=0.2)

In [None]:
y_train_tcn=keras.utils.to_categorical(y_train, num_classes=15)
y_test_tcn=keras.utils.to_categorical(y_test, num_classes=15)
print(x_train.shape,y_train_tcn.shape)

In [None]:
class TCN(nn.Module):
    def __init__(self,input_size, output_size, num_channels, kernel_size, dropout):
        super(TCN, self).__init__()
        self.pool=nn.AvgPool2d((4,4), stride=4)
        self.tcn = TemporalConvNet(input_size, num_channels, kernel_size, dropout=dropout)
        self.linear = nn.Linear(num_channels[-1], output_size)
        
    def forward(self, x):
        # x needs to have dimension (N, C, L) in order to be passed into CNN
        x=x.permute(0,3,1,2)
        pool=self.pool(x)
        flat = pool.view(-1, 1,pool.size()[1]*pool.size()[2]*pool.size()[3])
        tflat = self.tcn(flat)  # input should have dimension (N, C, L)
        o = self.linear(tflat[:, :, -1])
        return F.log_softmax(o, dim=1)

In [None]:
def get_dataset(x, y):
    return TensorDataset(
        torch.from_numpy(x).float(),
        torch.from_numpy(y).long()
    )


def get_dataloader(x: np.array, y: np.array, batch_size: int, shuffle: bool = False, num_workers: int = 0):
    dataset = get_dataset(x, y)
    return DataLoader(
        dataset, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers
    )

In [None]:
class TCNModel(pl.LightningModule):

    def __init__(self):
        super(TCNModel, self).__init__()
        # not the best model...
        self.val_loss=[]
        self.val_acc=[]
        self.l1 = TCN(input_size=1, output_size=15,num_channels=[25]*8,
                      kernel_size=7, dropout=0.05)

    def forward(self, x):
        return self.l1.forward(x)

    def training_step(self, batch, batch_nb):
        # REQUIRED
        x, y = batch
        y_hat = self.forward(x)
        labels_hat = torch.argmax(y_hat, dim=1)
        yc=torch.argmax(y, dim=1)
        train_acc = torch.sum(yc== labels_hat).float() / (len(yc) * 1.0)
        loss=F.nll_loss(y_hat, torch.max(y, 1)[1])
        #print('loss:', loss,'train_acc:',train_acc)
        return {'loss': loss.type(torch.float),'train_acc':train_acc.type(torch.float)}

    def validation_step(self, batch, batch_nb):
        # OPTIONAL
        x, y = batch
        y_hat = self.forward(x)
        labels_hat = torch.argmax(y_hat, dim=1)
        yc=torch.argmax(y, dim=1)
        val_acc = torch.sum((yc== labels_hat).float() / (len(yc) * 1.0),dtype=torch.float)
        val_loss=F.nll_loss(y_hat, torch.max(y, 1)[1])
        #print('val_loss:', val_loss,'val_acc:',val_acc)
        return {'val_loss': val_loss.type(torch.float),'val_acc':val_acc}

    def validation_end(self, outputs):
        # OPTIONAL
        avg_loss = torch.stack([x['val_loss'] for x in outputs]).mean()
        avg_acc = torch.stack([x['val_acc'] for x in outputs]).mean()
        self.val_loss.append(avg_loss.cpu().numpy())
        self.val_acc.append(avg_acc.cpu().numpy())
        print('avg_val_loss',avg_loss,'avg_val_acc', avg_acc)
        return {'avg_val_loss': avg_loss,'avg_val_acc': avg_acc}

    def test_step(self, batch, batch_nb):
        # OPTIONAL
        x, y = batch
        y_hat = self.forward(x)
        return {'test_loss': F.cross_entropy(y_hat, y)}

    def test_end(self, outputs):
        # OPTIONAL
        avg_loss = torch.stack([x['test_loss'] for x in outputs]).mean()
        return {'avg_test_loss': avg_loss}

    def configure_optimizers(self):
        # REQUIRED
        return torch.optim.Adam(self.parameters(), lr=0.0002)

    @pl.data_loader
    def train_dataloader(self):
        
        return get_dataloader(x_train, y_train_tcn,shuffle=True,batch_size= 200)

    @pl.data_loader
    def val_dataloader(self):
        # OPTIONAL
        # can also return a list of val dataloadersdr
        return get_dataloader(x_test, y_test_tcn, 200)

    @pl.data_loader
    def test_dataloader(self):
        # OPTIONAL
        # can also return a list of test dataloaders
        return get_dataloader(x_test, y_test_tcn, 400)

In [None]:
tcnmodel = TCNModel()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # PyTorch v0.4.0
tcnmodel = tcnmodel.to(device)
summary(tcnmodel, (80, 80,2))

In [None]:
trainer = Trainer(max_epochs=500, gpus=1, distributed_backend='dp')
tcnmodel.unfreeze()
trainer.fit(tcnmodel)

In [None]:
val_acc=np.array(tcnmodel.val_acc)
val_loss=np.array(tcnmodel.val_loss)
validation=np.transpose([val_acc,val_loss])
print(validation[-100:])


In [None]:
np.savetxt("tcn_val.csv", validation, delimiter=",")

In [None]:
tcnmodel.freeze()
tcnmodel = tcnmodel.to("cpu")
txtest= torch.from_numpy(x_test).float()
tytest= torch.from_numpy(y_test).float()
tytest=torch.argmax(tcnmodel(txtest), dim=1)
tyc=tytest.cpu().data.numpy()
tyt=np.argmax(y_test_tcn, axis=1)
print(tyt.shape)
print(tyc.shape)
test_acc=accuracy_score(tyt,tyc)
test_recall=recall_score(tyt,tyc,average='macro')
test_f1=f1_score(tyt,tyc,average="macro")
print(test_acc)
print(test_recall)
print(test_f1)

In [None]:
np.savetxt("tcn_test.csv", np.array([test_acc,test_recall,test_f1]), delimiter=",")

In [None]:
from sklearn.metrics import f1_score,average_precision_score,confusion_matrix,recall_score,accuracy_score
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
from matplotlib import mlab
import matplotlib
import matplotlib.cm as cm
import itertools
font = {'family' : 'normal',
        'size'   : 22}

matplotlib.rc('font', **font)
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    #print(cm)
    plt.figure(figsize=(24,16))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=60,fontsize=17)
    plt.yticks(tick_marks, classes,fontsize=17)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

cnf_matrix=confusion_matrix(tyt, tyc, labels=np.unique(tyt))
np.savetxt("tcn_confusion.csv", cnf_matrix, delimiter=",")
np.set_printoptions(precision=1)
class_names=class_name=['Drinking',"Eating","Standing","Sitting","Walking","Open door and get in","Open door and get out","Washing",
           "Open oven","Close oven","Open cabinet","Close cabinet","Open freezer","Close freezer","No activity"]
# Plot non-normalized confusion matrix
#plot_confusion_matrix(cnf_matrix, classes=class_names,
#                      title='Confusion matrix, without normalization')

# Plot normalized confusion matrix
plot_confusion_matrix(cnf_matrix, classes=class_names, normalize=True,
                      title='Normalized confusion matrix')

plt.show()