<a href="https://colab.research.google.com/github/a9e68ce5/Machine-Learning/blob/main/CNN_to_recognize_handwritten_digits.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:

import os
import random
import glob
import csv
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf

from torch.optim import Adam
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
from PIL import Image
from torch.optim.lr_scheduler import MultiStepLR as MultiStepLR
from torch.optim.lr_scheduler import StepLR as StepLR

!pip install --upgrade --no-cache-dir gdown

!gdown 1-8MWBqXSnMe5EpVZ7KQMxBKLKDIufM0Y

!unzip 'ds2022fallhw6.zip'

"""# ***Set arguments and random seed***"""

TRA_PATH = 'digit/digit/train/'
VAL_PATH = 'digit/digit/valid/'
TST_PATH = 'digit/digit/test/'
LABEL_TRAIN_PATH = 'digit/digit/train.csv'
LABEL_VALID_PATH = 'digit/digit/valid.csv'
LABEL_SAMPLE_PATH = 'digit/digit/sample.csv'
BATCH = 256
DEVICE_ID = 0
SEED = 5566
NUM_ECPOCH = 200

torch.cuda.set_device(DEVICE_ID)
use_gpu = torch.cuda.is_available()
device = torch.device("cuda" if use_gpu else "cpu")

torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
random.seed(SEED)
np.random.seed(SEED)

"""# ***Process data***"""

transform = transforms.Compose([
                 transforms.ToTensor(),
                 #transforms.RandomCrop(64,padding=2),#以圖片(PIL Image)中隨機裁減一塊圖像出來
                 #transforms.RandomAutocontrast(),
                 transforms.RandomVerticalFlip(p=0.5),
                 transforms.ToPILImage(mode=None)]   #轉成tensor之後一定要轉回PIL才可以讀
                 ) #https://pytorch.org/vision/stable/transforms.html

from torchvision.datasets import MNIST
from torch.utils.data import Dataset, DataLoader


from torchvision.datasets import CIFAR10
from torch.utils.data import Dataset, DataLoader

import torch
import numpy as np
import pandas as pd
from torchvision.io import read_image
import os
from PIL import Image

class DataPreparation(Dataset):
    def __init__(self,  data_path=None, label_path=None,
                 transform=None, target_transform=None):

        self.data_path = data_path
        self.label_path = label_path

        self.transform = transform
        self.target_transform = target_transform

        ## preprocess files
        self.preprocess(self.data_path, self.label_path)


    def __len__(self):
        return len(self.data_files)

    def __getitem__(self, idx):
        data_file = self.data_files[idx]
        img_path = os.path.join(self.data_path, data_file)
        image = Image.open(img_path) # plt.imread(img_path)

        if self.transform:
            image = self.transform(image)

        if self.label_path is None:
            return image, -1, data_file

        label = self.file_labels['label'][self.file_labels['image_name'] == data_file].iloc[0]

        if self.target_transform:
            label = self.target_transform(label)

        return image, label, data_file

    def preprocess(self, data_path, label_path):
        self.data_files = os.listdir(data_path)
        self.data_files.sort()

        if label_path is not None:
            self.file_labels = pd.read_csv(label_path)


class Data:
    def __init__(self, data_path, label_path):


        transform = transforms.Compose([
            transforms.Resize((28, 28)),
            transforms.ToTensor(),
            transforms.Normalize(mean=(0.5), std=(0.5))
        ])

        # Train Data
        train_dataset = DataPreparation(
                                        data_path=data_path,
                                        label_path=label_path,
                                        transform=transform)


        self.loader_train = DataLoader(
            train_dataset, batch_size=BATCH, shuffle=True,
            num_workers=2
            )

        # Validation Data
        valid_data_path = data_path.replace('train', 'valid')
        valid_label_path = label_path.replace('train', 'valid')

        valid_dataset = DataPreparation(
                                       data_path=valid_data_path,
                                       label_path=valid_label_path,
                                       transform=transform)

        self.loader_valid = DataLoader(
            valid_dataset, batch_size=BATCH , shuffle=False,
            num_workers=2
            )

        # Test Data (No label, only image)
        test_dataset = DataPreparation(
                                       data_path=data_path,
                                       transform=transform)


        self.loader_test = DataLoader(
            test_dataset, batch_size=BATCH, shuffle=False,
            num_workers=2
            )

loader = Data( data_path=TRA_PATH , label_path=LABEL_TRAIN_PATH )
train_loader = loader.loader_train
valid_loader = loader.loader_valid

"""# *** Test Data ***"""

class Data2:
    def __init__(self, data_path, label_path):


        transform = transforms.Compose([
            transforms.Resize((28, 28)),
            transforms.ToTensor(),
            transforms.Normalize(mean=(0.5), std=(0.5))
        ])

        # Test Data
        test_dataset = DataPreparation(
                                        data_path=data_path,
                                        label_path=label_path,
                                        transform=transform)


        self.loader_test = DataLoader(
            test_dataset, batch_size=BATCH, shuffle=False,
            num_workers=2
            )

loader1 = Data2( data_path=TST_PATH , label_path=None)
test_loader = loader1.loader_test

"""# ***Define module class***"""

class FaceExpressionNet(nn.Module):
    def __init__(self):
        super(FaceExpressionNet, self).__init__()
        self.conv = nn.Sequential(
            #torch.nn.Dropout(0.5),
            nn.Conv2d(3,32, kernel_size=7, stride=2, padding=3),
            nn.BatchNorm2d(32, eps=1e-05, affine=True),
            nn.LeakyReLU(negative_slope=0.05),
            #nn.Sigmoid(),
            #nn.ReLU(),


            nn.Conv2d(32,64, kernel_size=3, stride=1,padding=1),
            nn.BatchNorm2d(64, eps=1e-05, affine=True),
            nn.LeakyReLU(negative_slope=0.05),
            #nn.ReLU(),



            nn.Conv2d(64,128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128, eps=1e-05, affine=True),
            nn.LeakyReLU(negative_slope=0.05),
            #nn.Sigmoid(),
            #nn.ReLU(),



            nn.Conv2d(128,256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256, eps=1e-05, affine=True),
            nn.LeakyReLU(negative_slope=0.05),
            #nn.ReLU(),


            nn.Conv2d(256,512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512, eps=1e-05, affine=True),
            nn.LeakyReLU(negative_slope=0.05),
            #nn.ReLU(),


            nn.Conv2d(512,512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512, eps=1e-05, affine=True),
            nn.LeakyReLU(negative_slope=0.05),
            #nn.ReLU(),


            nn.Conv2d(512,512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512, eps=1e-05, affine=True),
            nn.LeakyReLU(negative_slope=0.05),
            #nn.ReLU(),


            nn.Conv2d(512,512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512, eps=1e-05, affine=True),
            nn.LeakyReLU(negative_slope=0.05),
            #nn.ReLU(),

            nn.Conv2d(512,512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512, eps=1e-05, affine=True),
            nn.LeakyReLU(negative_slope=0.05),
            #nn.ReLU(),


            nn.Conv2d(512,512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512, eps=1e-05, affine=True),
            nn.LeakyReLU(negative_slope=0.05),
            #nn.ReLU(),


            nn.AdaptiveMaxPool2d((1,1)),
        )
        self.fc = nn.Sequential(
            nn.Linear(512 ,256),
            nn.LeakyReLU(negative_slope=0.05),
            nn.Linear(256,256),
            nn.LeakyReLU(negative_slope=0.05),
            nn.Linear(256,256),
            nn.LeakyReLU(negative_slope=0.05),
            nn.Linear(256,10),

        )

    def forward(self, x):
        #image size (28,28)
        #print(x.shape)
        x = self.conv(x) #(14*14)
        #print("x",x.shape)
        x = x.flatten(start_dim=1)
        #print("x",x.shape)
        x = self.fc(x)
        #print("x",x.shape)
        return x

"""# ***Define training and testing process***"""

def train(train_loader, model, loss_fn, use_gpu=True):
    model.train()
    train_loss = []
    train_acc = []
    train_result=[]
    for (img, label, _) in train_loader:
        if use_gpu:
            img = img.to(device)
            label = label.to(device)
        optimizer.zero_grad()
        output = model(img)
        loss = loss_fn(output, label)
        loss.backward()
        optimizer.step()
        with torch.no_grad():#停止对梯度的计算和存储 ，从而减少对内存的消耗，不会进行反向传播。
            predict = torch.argmax(output, dim=-1)#輸出每一行的最大值
            acc = np.mean((label == predict).cpu().numpy())
            train_acc.append(acc)
            train_loss.append(loss.item())
            predict=predict.flatten()
    train_acc = np.mean(train_acc)
    train_loss = np.mean(train_loss)
    print("Epoch: {}, train Loss: {:.4f}, train Acc: {:.4f}".format(epoch + 1, train_loss, train_acc))
    return train_acc , train_loss
def valid(valid_loader, model, loss_fn, use_gpu=True):
    model.eval()
    with torch.no_grad():#停止对梯度的计算和存储 ，从而减少对内存的消耗，不会进行反向传播。
        valid_loss = []
        valid_acc = []
        for idx, (img, label,_) in enumerate(valid_loader):
            if use_gpu:
                img = img.to(device)
                label = label.to(device)
            output = model(img)
            loss = loss_fn(output, label)
            predict = torch.argmax(output, dim=-1)
            acc = (label == predict).cpu().tolist()
            valid_loss.append(loss.item())
            valid_acc += acc

        valid_acc = np.mean(valid_acc)
        valid_loss = np.mean(valid_loss)
        print("Epoch: {}, valid Loss: {:.4f}, valid Acc: {:.4f}".format(epoch + 1, valid_loss, valid_acc))#{:.4f}:輸出到小數點後四位
    return valid_acc , valid_loss

def save_checkpoint(valid_acc, acc_record, epoch, prefix='model'):
    # you can define the condition to save model :)
    if valid_acc >= np.mean(acc_record[-5:]):
        checkpoint_path = f'{prefix}.pth'
        torch.save(model.state_dict(), checkpoint_path)
        print('model saved to %s' % checkpoint_path)
    return valid_acc
def better(acc_record):
    if max(acc_record) == acc_record[-1]: return True
    return False

"""# ***實作Early Stopping***"""

class EarlyStopping(object):
    def __init__(self, mode='max', min_delta=0, patience=10, percentage=False):
        self.mode = mode
        self.min_delta = min_delta
        self.patience = patience
        self.best = None
        self.num_bad_epochs = 0
        self.is_better = None
        self._init_is_better(mode, min_delta, percentage)

        if patience == 0:
            self.is_better = lambda a, b: True #表示暱名函式（ Anonymous Function ）。
            self.step = lambda a: False

    def step(self, metrics):
        if self.best is None:
            self.best = metrics
            return False

        if np.isnan(metrics):#判斷是否是空值
            return True

        if self.is_better(metrics, self.best):
            self.num_bad_epochs = 0
            self.best = metrics
        else:
            self.num_bad_epochs += 1

        if self.num_bad_epochs >= self.patience:
            return True

        return False

    def _init_is_better(self, mode, min_delta, percentage):
        if mode not in {'min', 'max'}:
            raise ValueError('mode ' + mode + ' is unknown!')
        if not percentage:
            if mode == 'min':
                self.is_better = lambda a, best: a < best - min_delta
            if mode == 'max':
                self.is_better = lambda a, best: a > best + min_delta
        else:
            if mode == 'min':
                self.is_better = lambda a, best: a < best - (
                            best * min_delta / 100)
            if mode == 'max':
                self.is_better = lambda a, best: a > best + (
                            best * min_delta / 100)

"""# ***Training***"""

es = EarlyStopping(patience = 10)
if __name__ == '__main__':
    model = FaceExpressionNet()
    if use_gpu:
        model.to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    scheduler = StepLR(optimizer, 20, gamma = 0.1)

    loss_fn = nn.CrossEntropyLoss()

    acc_record = []
    train_acc = []
    train_Loss = []
    valid_Loss = []
    for epoch in range(NUM_ECPOCH):
        scheduler.step(epoch)
        result , train_loss = train(train_loader, model, loss_fn, use_gpu)
        train_acc.append(result)
        valid_acc , valid_loss = valid(valid_loader, model, loss_fn, use_gpu=True)
        acc_record.append(valid_acc)
        train_Loss.append(train_loss)
        valid_Loss.append(valid_loss)
        if better(acc_record):
            best_acc=save_checkpoint(valid_acc, acc_record, epoch, prefix='model')
        if es.step(valid_acc):
          print("Early stopping with best_acc: {:.4f} at Epoch: {}". format(best_acc,epoch+1))
          break
        print('########################################################')

"""# ***cross entropy loss***"""

# Commented out IPython magic to ensure Python compatibility.
# %matplotlib inline
fig = plt.figure() #定義一個圖像窗口
plt.title("CrossEntropy Loss")
plt.xlabel("Epoch")
plt.ylabel("accuracy")
line1,=plt.plot(valid_Loss , '-x',color='red',label='valid learning curve') #定義x,y和圖的樣式
line2,=plt.plot(train_Loss, '-x',color='blue',label='training learning curve') #定義x,y和圖的樣式
plt.legend(handles = [line1, line2])
plt.savefig('/content/CrossEntropy Loss.jpg', bbox_inches='tight')

"""# ***accuracy loss***"""

# Commented out IPython magic to ensure Python compatibility.
# %matplotlib inline
fig = plt.figure() #定義一個圖像窗口
plt.title("valid and training learning curve")
plt.xlabel("Epoch")
plt.ylabel("accuracy")
line1,=plt.plot(acc_record, '-x',color='red',label='valid learning curve') #定義x,y和圖的樣式
line2,=plt.plot(train_acc, '-x',color='blue',label='training learning curve') #定義x,y和圖的樣式
plt.legend(handles = [line1, line2])
plt.savefig('/content/ with valid and training learning curve.jpg', bbox_inches='tight')

"""# ***Testing***"""

def test2(loader_test, model, output_file_name):
    outputs = []
    datafiles = []
    # switch to eval mode
    model.eval()

    with torch.no_grad():
        for i, (inputs, targets, datafile) in enumerate(loader_test, 1):
            inputs = inputs.to(device)
            targets = targets.to(device)


            preds = model(inputs)

            _, output = preds.topk(1, 1, True, True)

            outputs.extend(list(output.reshape(-1).cpu().detach().numpy()))

            datafiles.extend(list(datafile))



    output_file = dict()
    output_file['image_name'] = datafiles
    output_file['label'] = outputs

    output_file = pd.DataFrame.from_dict(output_file)
    output_file.to_csv(output_file_name, index = False)

del model
model = FaceExpressionNet()
model.load_state_dict(torch.load('model.pth'))
model = model.cuda()
test2(test_loader, model , 'predict2.csv')
test2(valid_loader, model , 'valid_pred.csv')

print(model)

"""# ***confusion matrix and 熱點圖***"""

valid_set=pd.read_csv(LABEL_VALID_PATH)
valid_pred = pd.read_csv('/content/valid_pred.csv')

from sklearn.metrics import confusion_matrix
cm=confusion_matrix(valid_pred['label'], valid_set['label'])

import matplotlib.pyplot as plt
import seaborn as sns
sns.heatmap(cm,square= True, annot=True, cbar= True)
plt.xlabel("predicted value")
plt.ylabel("true value")
plt.title("confusion matrix")
plt.show()

"""# ***Model***"""

!gdown --id "1t7TlJB6VRhXP1lfx0DCC-nTI2jXNtI7Z" --output "model.pth"