## Timesformer + AcT

In [1]:
import os
import numpy as np
import time
import matplotlib.pyplot as plt
from tqdm import tqdm
from easydict import EasyDict
import torch.nn.functional as F

import copy
import json
from sklearn.metrics import confusion_matrix


import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision import models
from torch.utils.data import DataLoader,random_split
from torch.utils.data import Subset

from utils import Dataset
from utils import utils

import sklearn

In [2]:
#model in [Times_AcT , AcT , Densenet]
model_name='TestModel'

args=EasyDict({
    'base_model': model_name,
    'pretrained':True,
    'lr':0.01,
    'start_epoch':1,
    'num_epochs':25,
    'continue_epoch':False,
    
    #Dataset parms
    'num_classes':10,
    'batch_size':8,
    #Path params
    'model_path':'./models/weights/'+model_name+'/',
    'history_path':'./history/'
})

In [3]:
class VitembLayer(nn.Module):
    def __init__(self,
                 in_channels:int=3,
                 emb_dim:int=128,
                 image_size=(16,176,128),
                 patch_t_size:int=4,
                 kernel_size=(2,8,8)):
        """
        stride : (t,h,w)各次元のstride量
        """
        super(VitembLayer,self).__init__()
        self.in_channels=in_channels
        self.emb_dim=emb_dim
        self.patch_size=kernel_size[1]
        self.patch_t_size=kernel_size[0]
        #各frameをパッチに , 1つのパッチは16*16にする
        self.num_patch_h=int(image_size[1]/self.patch_size)
        self.num_patch_w=int(image_size[2]/self.patch_size)
        self.num_patch_t=int(image_size[0]/self.patch_t_size)
        self.image_size=image_size
        self.num_patch=self.num_patch_h*self.num_patch_w*self.num_patch_t
        self.kernel_size=kernel_size
        
        self.patch_emb_layer=nn.Conv3d(
            in_channels=self.in_channels,
            out_channels=self.emb_dim,
            kernel_size=self.kernel_size,
            stride=self.kernel_size)
        
        self.cls_token=nn.Parameter(torch.randn(1,1,emb_dim))
        self.pos_emb=nn.Parameter(
            torch.randn(1,self.num_patch+1,emb_dim))
    
    def forward(self,x:torch.Tensor) -> torch.Tensor:
        """
        in : x (B,C,T,H,W) 
        return:
                z_0 : 各tubelet (B,N,D)
        """
        
        #x -> (B*N,T,P)
        #if x.dim()==4:
        #    x=x.reshape(-1,self.num_patch,self.input_dim)
        b,c,t,h,w=x.shape
        z_0=self.patch_emb_layer(x)
        z_0=z_0.flatten(2).transpose(1,2)
        cls_token=self.cls_token.repeat(repeats=(x.size(0),1,1))
        z_0=torch.cat([cls_token,z_0],dim=1)
        # Add positional emb
        z_0=z_0+self.pos_emb
        return z_0
    


class MHSA(nn.Module):
    def __init__(self,emb_dim:int=128,num_heads:int=2,dropout:float=0.,with_qkv=True):
        """
        with_qkv :
        -> True なら、MHSAのために、3倍のdimに入力をq,k,vに埋め込む
        """
        
        super(MHSA,self).__init__()
        self.num_heads=num_heads
        self.emb_dim=emb_dim
        self.head_dim=emb_dim//self.num_heads
        self.sqrt_d=emb_dim**0.5
        self.with_qkv=with_qkv
        
        #埋め込み
        if with_qkv:
            self.w_qkv=nn.Linear(self.emb_dim,self.emb_dim*3,bias=False)
            self.dropout=nn.Dropout(dropout)
        
        self.w_o=nn.Sequential(
            nn.Linear(emb_dim,emb_dim),
            nn.Dropout(dropout))
        
    def forward(self,z:torch.Tensor)->torch.Tensor:
        B,N,D=z.shape
        if self.with_qkv:
            #->(B,N,3,num_heads,head_dim)
        
            z=self.w_qkv(z).reshape(B,N,3,self.num_heads,self.head_dim)
            # ->(3,B,num_heads,N,head_dim)
            z=z.permute(2,0,3,1,4)
            q,k,v=z[0],z[1],z[2]
        #埋め込みが不要なら、ただqkv複製
        else:
            z=z.reshape(B,N,self.num_heads,self.head_dim)
            #->(B.num_heads ,N ,head_dim )
            z=z.permute(0,2,1,3)
            q,k,v=z,z,z
        
        attn=(q@k.transpose(-2,-1))/self.sqrt_d
        attn=F.softmax(attn,dim=-1)
        attn_weight=self.dropout(attn)
        # (B,heads,N,Dh) -> (B,N,heads,Dh) -> (B,N,D)
        out=(attn_weight@v).transpose(1,2).reshape(B,N,D)
        
        if self.with_qkv:
            out=self.w_o(out)
        return out
    
class VitEncoderBlock(nn.Module):
    def __init__(self,emb_dim:int=128,num_heads:int=2,mlp_ratio:int=2
                 ,drop:float=0.,attn_drop:float=0.):
        """
        mlp_ratio : nn.Linearでemb_dimの何倍に埋め込むか
        attn_drop : Attn用のdropout率
        drop      : Block用のdropout率
        """
        super(VitEncoderBlock,self).__init__()
        self.dropout=nn.Dropout(drop)
        self.ln1=nn.LayerNorm(emb_dim)
        self.mhsa=MHSA(emb_dim=emb_dim,num_heads=num_heads,dropout=attn_drop)
        self.ln2=nn.LayerNorm(emb_dim)
        self.mlp=nn.Sequential(
            nn.Linear(emb_dim,emb_dim*mlp_ratio),
            nn.GELU(),
            nn.Dropout(drop),
            nn.Linear(emb_dim*mlp_ratio,emb_dim),
            nn.Dropout(drop))
    
    def forward(self,z:torch.Tensor)->torch.Tensor:
        out1=self.mhsa(self.ln1(z))+z
        out2=self.mlp(self.ln2(out1))+out1    
        return out2

class testModel(nn.Module):
    def __init__(self,
                num_classes:int=10,
                emb_dim:int=128,
                num_blocks:int=7,
                head_num:int=2,
                kernel_size=(2,8,8),
                mlp_ratio:int=3,
                drop:float=0.,
                attn_drop:float=0.):
        super().__init__()
        self.emb_layer=VitembLayer(kernel_size=kernel_size)
        self.vit_encoder=nn.Sequential(
            *[VitEncoderBlock(emb_dim,head_num,mlp_ratio,drop,attn_drop) 
             for _ in range(num_blocks)])
        self.mlp_head=nn.Sequential(
            nn.LayerNorm(emb_dim),
            nn.Linear(emb_dim,num_classes))
    def forward(self,x:torch.Tensor)->torch.Tensor:
        input_x=self.emb_layer(x)
        out=self.vit_encoder(input_x)
        cls_token=out[:,0]
        pred=self.mlp_head(cls_token)
        return pred

model=testModel()


In [4]:
def Train_model(model,dataloaders_dict,criterion,optimizer,args):
    start_time=time.time()
    
    train_acc_history=[]
    val_acc_history=[]
    train_loss_history=[]
    val_loss_history=[]
    train_f1_history=[]
    val_f1_history=[]
    plot_epoch=[]
    
    best_model_wts=copy.deepcopy(model.state_dict())
    best_acc=0.0
    best_epoch=0
    model.to(device)
    num_epoch=args.num_epochs
    for epoch in range(1,num_epoch+1):
        for phase in ['train','val']:
            epoch_time=time.time()
            if phase == 'train':
                model.train()
                train_pred_class=[]
                train_ground_truths=[]
            else:
                model.eval()
                val_pred_class=[]
                val_ground_truths=[]
            running_loss=0.0
            running_corrects=0
            train_n_total=1
            
            pbar=tqdm(dataloaders_dict[phase])
            i=0
            for sample in pbar:
                #動画のみなら 'video'  joints なら 'joints' 
                inputs=sample['video'].to(device)
                #inputs=sample['joints'].to(device)
                
                labels=sample['action'].to(device)
                
                optimizer.zero_grad()
                
                with torch.set_grad_enabled(phase=='train'):
                    outputs=model(inputs)
                    loss=criterion(outputs,torch.max(labels,1)[1])
                    
                    _,preds=torch.max(outputs,1)
                    
                    if phase=='train':
                        train_pred_class.extend(preds.detach().cpu().numpy())
                        train_ground_truths.extend(torch.max(labels,1)[1].detach().cpu().numpy())
                    else:
                        val_pred_class.extend(preds.detach().cpu().numpy())
                        val_ground_truths.extend(torch.max(labels,1)[1].detach().cpu().numpy())
                    if phase=='train':
                        loss.backward()
                        optimizer.step()
                    
                running_loss+=loss.item()*inputs.size(0)
                running_corrects+=torch.sum(preds==torch.max(labels,1)[1])
                
                pbar.set_description('Phase: {} || Epoch: {} || Loss{:.5f}'.format(phase,epoch,running_loss/train_n_total))
                train_n_total+=1
            
            epoch_loss=running_loss/len(dataloaders_dict[phase].dataset)
            epoch_acc=running_corrects/len(dataloaders_dict[phase].dataset)
            
            #Train and Val have done
            epoch_end_time=time.time()-epoch_time
            if phase=='train':
                print('Training Complete in {:.0f}m {:.0f}s '.format(epoch_end_time//60,epoch_end_time%60))
            else:
                print('Validation Complete in {:.0f}m {:.0f}s '.format(epoch_end_time//60,epoch_end_time%60))
            #print('{} Loss: {:.4f} Acc:{:.4f} % '.format(phase,epoch_loss,epoch_acc))
            
            if phase =='train':
                train_acc_history.append(epoch_acc)
                train_loss_history.append(epoch_loss)
                train_pred_classes=np.asarray(train_pred_class)
                train_ground_truths=np.asarray(train_ground_truths)
                
                train_accuracy,train_precision,train_recall,train_f1=utils.Get_scores(
                    train_pred_classes , train_ground_truths)
                
                train_f1_history.append(train_f1)
                train_confusion_matrix=np.array_str(confusion_matrix(train_ground_truths,train_pred_class,labels=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]))
                print('Epoch: {} || Train_Acc: {} || Train_Loss: {}'.format(
                    epoch, train_accuracy, epoch_loss
                ))
                print(f'train: \n{train_confusion_matrix}')
                plot_epoch.append(epoch)
                
                train_loss=epoch_loss
            
            # For Checkpointing and Confusion Matrix
            if phase == 'val':
                val_acc_history.append(epoch_acc)
                val_loss_history.append(epoch_loss)
                val_pred_classes = np.asarray(val_pred_class)
                val_ground_truths = np.asarray(val_ground_truths)
                
                val_accuracy,val_precision,val_recall,val_f1 = utils.Get_scores(
                    val_pred_classes, val_ground_truths
                )
                
                val_f1_history.append(val_f1)
                val_confusion_matrix = np.array_str(confusion_matrix(val_ground_truths, val_pred_classes, labels=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]))
                print('Epoch: {} || Val_Acc: {} || Val_Loss: {}'.format(
                    epoch, val_accuracy, epoch_loss
                ))
                print(f'val: \n{val_confusion_matrix}')

                # Deep Copy Model if best accuracy
                if epoch_acc > best_acc:
                    best_acc = epoch_acc
                    best_model_wts = copy.deepcopy(model.state_dict())
                    best_epoch=epoch+1
                    
                # set current loss to val loss for write history
                val_loss = epoch_loss
            
            
        #モデル/optimizerを保存
        model_name= utils.save_weights(model, args, epoch, optimizer)
            
        #writehistory
        # Write History after train and validation phase
        utils.write_history(
                    args,
                    epoch,
                    train_loss,train_accuracy,train_f1,train_precision,train_recall,train_confusion_matrix,
                    val_loss,val_accuracy,val_f1,val_precision,val_recall,val_confusion_matrix,
                    best_acc)
        
        
    end=time.time()-start_time
    print('all done in {:.0f}m {:.0f}s'.format(end//60,end%60))
    print('Best val Acc {:.4f}'.format(best_acc))
    history_path=args.history_path+args.base_model+'.txt'
    with open(history_path,'a') as file:
        f.write('All Done in {:.0f}m {:.0f}s'.format(end//60,end%60))
    #load best model
    model.load_state_dict(best_model_wts)
    save_path=os.path.join(args.model_path,'best_weights_{}.pth'.format(best_epoch))
    torch.save(model.state_dict(),save_path)
    return model, train_loss_history, val_loss_history, train_acc_history, val_acc_history, train_f1, val_f1, plot_epoch


In [7]:
dataset=Dataset.BasketballDataset(annotation_dict='../transformermodels/dataset/annotation_dict.json',
                                    augmented_dict='../transformermodels/dataset/augmented_annotation_dict.json',
                                    video_dir='../transformermodels/dataset/examples/',
                                    augmented_video_dir='../transformermodels/dataset/augmented-examples/',
                                    augment=True,poseData=False,joints_to_numpy=False)


train_dataset_size=dataset.__len__()
train_num=int(train_dataset_size*0.7)
val_num=train_dataset_size-train_num
### ver 2.
train_dataset,val_dataset=torch.utils.data.random_split(dataset,[train_num,val_num])
#test用のsubset
#train_subset=Subset(train_dataset,list(range(0,70)))
#val_subset=Subset(val_dataset,list(range(0,30)))

train_loader=DataLoader(train_dataset,shuffle=True,batch_size=4)
val_loader=DataLoader(val_dataset,shuffle=False,batch_size=4)
dataloaders_dict={'train':train_loader,'val':val_loader}


In [None]:
model=testModel()
optimizer=optim.Adam(model.parameters(),lr=args.lr)
criterion=nn.CrossEntropyLoss()
device=torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
model, train_loss_history, val_loss_history,train_acc_history, val_acc_history, train_f1_score, val_f1_score, plot_epoch = Train_model(model,dataloaders_dict,criterion,optimizer,args)


Phase: train || Epoch: 1 || Loss9.66084:   2%|█▋                                                                        | 205/8733 [00:28<20:00,  7.11it/s]