<a href="https://colab.research.google.com/github/HunterInDarkness/Lip-Reading/blob/master/LipNetBASE.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
import os
import cv2
import torch
import numpy as np
import pandas as pd

import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from torch.utils.data import DataLoader
from torch.utils.data import Dataset

from collections import OrderedDict

import torchvision
import torchvision.transforms as transforms
from datetime import datetime

#from LipReadDataTrain import ReadData
#from LipNet import LipNet, LipSeqLoss
#from LipReadDataTest import ReadData as ReadDataTest

###  LipReadDataTrain.py

In [0]:
class ReadData(Dataset):

    def __init__(self, image_root, label_root, seq_max_lens):
        self.seq_max_lens = seq_max_lens
        self.data = []
        self.data_root = image_root
        with open(label_root, 'r', encoding='utf8') as f:
            lines = f.readlines()
            lines = [line.strip().split('\t') for line in lines]
            self.dictionary = sorted(np.unique([line[1] for line in lines])) 
            pic_path = [image_root + '/' + line[0] for line in lines] 
            self.lengths = [len(os.listdir(path)) for path in pic_path]
            
            save_dict = pd.DataFrame(self.dictionary, columns=['dict'])
            save_dict.to_csv('./dictionary/dictionary.csv', encoding='utf8', index=None)  #save dict

            self.data = [(line[0], self.dictionary.index(line[1]), length) for line, length in zip(lines, self.lengths)]
            self.data = list(filter(lambda sample: sample[-1] <= self.seq_max_lens, self.data))      


    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        (path, label, pic_nums) = self.data[idx]
        path = os.path.join(self.data_root, path)
        files = [os.path.join(path, ('{}' + '.png').format(i)) for i in range(1, pic_nums+1)]
        files = filter(lambda path: os.path.exists(path), files)
        frames = [cv2.imread(file) for file in files ] 
        frames_ = [cv2.cvtColor(img, cv2.COLOR_BGR2RGB) for img in frames]       
        length = len(frames_)
        channels = 3
        picture_h_w = 112
        vlm = torch.zeros((channels, self.seq_max_lens, picture_h_w, picture_h_w))
        
        for i in range(len(frames_)):
            result = transforms.Compose([
                transforms.ToPILImage(),
                transforms.Resize((picture_h_w, picture_h_w)),
                transforms.CenterCrop((picture_h_w, picture_h_w)),
                transforms.ToTensor(),
                transforms.Normalize([0, 0, 0], [1, 1, 1]) 
            ])(frames_[i])
            vlm[:, i] = result
        
        return {'volume': vlm, 'label': torch.LongTensor([label]), 'length': length}

### LipReadDataTest.py

In [0]:
class ReadData(Dataset):

    def __init__(self, image_root, seq_max_lens=15):
        self.seq_max_lens = seq_max_lens
        self.data_root = image_root
        self.data = []

        # linux: /   windows:\\
        pic_file_path = [root for root, dirs, files in os.walk(self.data_root) if root.split('/')[-1]!=self.data_root.split('/')[-1]]
        file_names = [i.split('/')[-1] for i in pic_file_path]

        self.lengths = [len(os.listdir(path)) for path in pic_file_path]
        self.data = [(file_name, length,) for file_name, length in zip(file_names, self.lengths)]        
        self.data = list(filter(lambda sample: sample[-1] <= self.seq_max_lens, self.data)) 
   
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):  
        (path, pic_nums) = self.data[idx]
        root_add_filename = os.path.join(self.data_root, path)
        files = [os.path.join(root_add_filename, ('{}' + '.png').format(i)) for i in range(1, pic_nums+1)]
        files = filter(lambda path: os.path.exists(path), files)
        frames = [cv2.imread(file) for file in files] 
        frames_ = [cv2.cvtColor(img, cv2.COLOR_BGR2RGB)  for img in frames]       
        length = len(frames_)    
        channels=3
        picture_h_w=112
        vlm = torch.zeros((channels, self.seq_max_lens, picture_h_w, picture_h_w))
        for i in range(len(frames_)):
            result = transforms.Compose([
                transforms.ToPILImage(),
                transforms.Resize((picture_h_w, picture_h_w)),
                transforms.CenterCrop((picture_h_w, picture_h_w)),
                transforms.ToTensor(),
                transforms.Normalize([0, 0, 0], [1, 1, 1]) 
            ])(frames_[i])
            vlm[:, i] = result       
        return {'volume': vlm, 'length': length, 'key': path}

###  LipNet.py

In [0]:
class LipSeqLoss(nn.Module):
 
    def __init__(self):
        super(LipSeqLoss, self).__init__()
        self.criterion = nn.NLLLoss(reduction='none')

    def forward(self, input, length, target):
        loss = []
        transposed = input.transpose(0, 1).contiguous()
        for i in range(transposed.size(0)):
            loss.append(self.criterion(transposed[i, ], target.squeeze(1)).unsqueeze(1))
        loss = torch.cat(loss, 1)
        
        #GPU version
        mask = torch.zeros(loss.size(0), loss.size(1)).float().cuda()
        # Cpu version
#         mask = torch.zeros(loss.size(0), loss.size(1)).float()   

        for i in range(length.size(0)):
            L = min(mask.size(1), length[i])
            mask[i, L-1] = 1.0
        loss = (loss * mask).sum() / mask.sum()
        return loss
      
class LipNet(torch.nn.Module):
    def __init__(self, init_features_num=64, drop_rate=0.3, type_class=313):
        super(LipNet, self).__init__()
        self.drop_rate = drop_rate
        self.type_class = type_class 

        # Cnn
        self.features = nn.Sequential(OrderedDict([
            ('conv', nn.Conv3d(3, init_features_num, kernel_size=(5, 7, 7), stride=(1, 2, 2), padding=(2, 3, 3), bias=False)),
            ('norm', nn.BatchNorm3d(init_features_num)),
            ('relu', nn.ReLU(inplace=True)),
            ('pool', nn.MaxPool3d(kernel_size=(1, 3, 3), stride=(1, 2, 2), padding=(0, 1, 1))), ]))
     
        # Rnn
        self.gru1 = nn.GRU(64*28*28, 256, bidirectional=True, batch_first=True) 
        self.gru2 = nn.GRU(512, 256, bidirectional=True, batch_first=True)
        # Fc
        self.fc = nn.Sequential(
            nn.Dropout(self.drop_rate),
            nn.Linear(512, self.type_class) )
        
        
    def forward(self, x):
        self.gru1.flatten_parameters()
        self.gru2.flatten_parameters()
        # Cnn
        cnn = self.features(x)
        cnn = cnn.permute(0, 2, 1, 3, 4).contiguous()
        batch, seq, channel, high, width = cnn.size()
        cnn = cnn.view(batch, seq, -1)
        # Rnn
        rnn, _ = self.gru1(cnn)
        rnn, _ = self.gru2(rnn)
        # Fc
        fc = self.fc(rnn).log_softmax(-1)
        return fc

###LipNetTraining.ipynb

In [0]:
###=============================================== 1.Data ===============================================
train_image_file = os.path.join(os.path.abspath('.'), "F:/ML/Competition/train/lip_train")
train_label_file = os.path.join(os.path.abspath('.'), "F:/ML/Competition/train/lip_train.txt")
training_dataset = ReadData(train_image_file, train_label_file, seq_max_lens=24)
training_data_loader = DataLoader(training_dataset, batch_size=20, shuffle=True, num_workers=12, drop_last=True)

# GPU
device = torch.device('cuda:0')
# # CPU 
# device = torch.device('cpu')

model = LipNet().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
loss_fc = LipSeqLoss().to(device)

TypeError: ignored

In [0]:
###=============================================== 2.Training ===============================================
for epoch in range(1,1000):
    print(epoch)
    model.train()
    for i_batch, sample_batched in enumerate(training_data_loader):
        
        input_data = Variable(sample_batched['volume']).to(device) 
        labels = Variable(sample_batched['label']).to(device)
        length = Variable(sample_batched['length']).to(device)
        
        optimizer.zero_grad()
        result = model(input_data)  
        loss = loss_fc(result, length, labels)   
        loss.backward()
        optimizer.step()
        
    if epoch % 5 == 0:
        current_time = datetime.now()
        print("current time:", current_time)
        print("number of epoch:", epoch)
        print("current loss:", loss)
                   
        # save model
        torch.save(model.state_dict(), "./weight/demo_net_epoch_{}.pt".format(epoch))

###LipPredict.ipynb

In [0]:
###=============================================== 1.Predict ===============================================
test_image_file = os.path.join(os.path.abspath('.'), "data/lip_test")
test_dataset = ReadDataTest(test_image_file, seq_max_lens=24)
test_data_loader = DataLoader(test_dataset, batch_size=20, shuffle=True, num_workers=8, drop_last=False)

#GPU
device = torch.device('cuda:0')
# # CPU
# device = torch.device('cpu')

model = LipNet().to(device)
model.load_state_dict(torch.load("./weight/demo_net_epoch_2.pt"))   
model.eval()

with torch.no_grad():
    col_key = []
    col_pre = []
    for i_batch, sample_batched in enumerate(test_data_loader):
        
        input_data = Variable(sample_batched['volume']).to(device)
        length = Variable(sample_batched['length']).to(device)
        
        # linux
        keys =[i.split('/')[-1] for i in sample_batched['key']]
#         # windows 
#         keys =[i.split('\\')[-1] for i in sample_batched['key']]

        outputs = model(input_data)
        average_volumns = torch.sum(outputs.data, 1)
        for i in range(outputs.size(0)):
            average_volumns[i] = outputs[i, :length[i]].sum(0)
        _, max_indexs = torch.max(average_volumns, 1)
        max_indexs = max_indexs.cpu().numpy().tolist()
        
        col_key += keys
        col_pre += max_indexs

In [0]:
###=============================================== 2.file to submit ===============================================
dictionary = pd.read_csv('./dictionary/dictionary.csv', encoding='utf8')
word_list = dictionary.dict.tolist()
character_label = [word_list[i] for i in col_pre]
predict = pd.DataFrame([col_key, character_label]).T
predict.to_csv('预测结果.csv',encoding='utf8', index=None, header=None)