In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models
import cv2
from torchvision import transforms
from torch.utils.data import Dataset,DataLoader,random_split
from PIL import Image
import os
import time
import copy
import torch.nn.utils.rnn as rnn_utils
import numpy as np

In [2]:
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class CNN_LSTM_Model(nn.Module):
  def __init__(self,num_classes,hidden_size,num_layers):
    super(CNN_LSTM_Model,self).__init__()

    resnet = models.resnet18(pretrained=True)
    self.cnn = nn.Sequential(*list(resnet.children())[:-1])

    self.lstm = nn.LSTM(input_size=resnet.fc.in_features,hidden_size=hidden_size,num_layers=num_layers,batch_first=True)

    self.fc = nn.Linear(hidden_size,num_classes)

  def forward(self,inputs,lengths):
    batch_size,timesteps,C,H,W = inputs.size()
    c_out = []
    for t in range(timesteps):
      cnn_out = self.cnn(inputs[:,t,:,:,:])
      cnn_out = cnn_out.view(batch_size,-1)
      c_out.append(cnn_out)
    c_out = torch.stack(c_out,dim=1)
    output,(hn,cn) = self.lstm(c_out)
    output = self.fc(output[:,-1,:])

    return output,c_out

In [3]:
def movie2frame(video_path,max_frames,transform):
  cap = cv2.VideoCapture(video_path)
  frames = []
  mask = []
  while True:
    ret,frame = cap.read()
    if not ret:
      break
    frame = cv2.cvtColor(frame,cv2.COLOR_BGR2RGB)
    frame = transform(frame)
    mask.append(1)
    frames.append(frame)

  while len(frames) < max_frames:
    frames.append(torch.zeros_like(frames[0]))
    mask.append(0)
  return torch.stack(frames),torch.tensor(mask)

In [4]:
def frame2torch(dir_path,transform): 
    files = os.listdir(dir_path)
    files.sort()
    frames = []
    for file in files:
        image = np.array(Image.open(os.path.join(dir_path,file)))
        image = transform(image)
        frames.append(image)
    return torch.stack(frames)

In [5]:
# import re
# import pandas as pd
# class CustomImageDataset(Dataset):
#   def __init__(self,img_dir,transform):
#     self.img_dir = img_dir
#     self.transform = transform
#     self.img_labels = []
#     self.folder_name2label = {}
#     target = "Train" if re.search("Train",img_dir) else "Test"
#     df = pd.read_csv(img_dir+"_csv"+"/"+target+".csv")
#     for index,row in df.iterrows():
#       self.folder_name2label[row["video_id"]] = row["label_id"]
#     for folder in os.listdir(img_dir):
#       if folder != ".DS_Store":
#         self.img_labels.append((self.img_dir+"/"+folder,self.folder_name2label[int(folder)]))      

#   def __len__(self):
#     return len(self.img_labels)
  
#   def __getitem__(self,idx):
#     img_path,label = self.img_labels[idx]
#     frames = frame2torch(img_path,self.transform)
#     return frames,label

In [21]:
from CustomDataset.Dataset import CustomImageDataset
data_transforms = {
    "Train":transforms.Compose([
        transforms.ToTensor(),
        transforms.Resize((96,96)),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
        transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),
        transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
    ]),
    "Test":transforms.Compose([
        transforms.ToTensor(),
        transforms.Resize((96,96)),
        transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
    ]),
}

data_dir = "./"
image_datasets = {x:CustomImageDataset(os.path.join(data_dir,x),data_transforms[x]) for x in ["Train","Test"]}

img_dataloaders = {x:torch.utils.data.DataLoader(image_datasets[x],batch_size=2,shuffle=True,num_workers=1)
                for x in ["Train","Test"]}
dataset_sizes = {x:len(image_datasets[x]) for x in ["Train","Test"]}

print(dataset_sizes)


{'Train': 13302, 'Test': 1936}


In [22]:
dataloader = img_dataloaders["Train"]
for i ,data in enumerate(dataloader):
    print(data[0].shape)
    print(data[1].shape)
    if i == 3:
        break
    i += 1


torch.Size([2, 37, 3, 96, 96])
torch.Size([2])
torch.Size([2, 37, 3, 96, 96])
torch.Size([2])
torch.Size([2, 37, 3, 96, 96])
torch.Size([2])
torch.Size([2, 37, 3, 96, 96])
torch.Size([2])


In [16]:
net = CNN_LSTM_Model(5,128,1)

#OS正則化のパラメータ
regularization_param = 0.01

loss_function = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(),lr=0.0001,momentum=0.9)



In [8]:
import time
import copy
import torch.nn.utils.rnn as rnn_utils
def train_model(model, criterion, optimizer, scheduler, num_epochs):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

    model.to(device)

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        # 各エポックには訓練フェーズと検証フェーズがあります
        for phase in ['Train', 'Test']:
            if phase == 'Train':
                model.train()  # モデルを訓練モードに設定します
            else:
                model.eval()   # モードを評価するモデルを設定します

            running_loss = 0.0
            running_corrects = 0

            # データをイレテートします
            for frames,labels in img_dataloaders[phase]:
                frames = frames.to(device)
                labels = torch.tensor(labels).to(device)
                # パラメータの勾配をゼロにします
                optimizer.zero_grad()

                # 順伝播
                # 訓練の時だけ、履歴を保持します
                with torch.set_grad_enabled(phase == 'Train'):
                    outputs = model(frames)
                    _, preds = torch.max(outputs[0], 1)
                    loss = criterion(outputs[0],labels)
                    # 訓練の時だけ逆伝播＋オプティマイズを行います
                    if phase == 'Train':
                        loss.backward()
                        optimizer.step()

                # 損失を計算します
                running_loss += loss.item() * frames.size(0)
                running_corrects += torch.sum(preds == labels.data)
            if phase == 'Train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                phase, epoch_loss, epoch_acc))

            # モデルをディープ・コピーします
            if phase == 'Test' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    print('Best test Acc: {:4f}'.format(best_acc))

    # ベストモデルの重みをロードします
    model.load_state_dict(best_model_wts)
    return model



In [None]:
torch.save(model_ft.state_dict(), './gdrive/My Drive/CNN_LSTM_Model_weights_2024-10-17.pth')
# torch.save(model_ft.state_dict(), '$HOME/Desktop/CNN_LSTM_Model_weights_2024-10-17.pth')
import torch.onnx as onnx
input_image = torch.zeros((1,3,224,224)).to(device)
onnx.export(model_ft, input_image, './gdrive/My Drive/CNN_LSTM_Model_2024-10-17.onnx')
# onnx.export(model_ft, input_image, '$HOME/CNN_LSTM_Model_2024-10-17.onnx')