# 데이터 다운로드

In [None]:
!wget https://raw.githubusercontent.com/TeamAIoT/deep-learning/main/Dataset/titanic_train.csv
!wget https://raw.githubusercontent.com/TeamAIoT/deep-learning/main/Dataset/titanic_test.csv

# 모듈 임포트

In [None]:
import torch
import torchvision
import os

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as transforms

from torch.utils.data import Dataset,random_split,DataLoader
from torchvision.datasets import MNIST
from torch.cuda import is_available

%matplotlib inline

# GPU 사용을 위한 Device 세팅

In [None]:
device='cuda' if is_available() else 'cpu'
device

# Titanic Dataset

## Titanic Dataset 정의

In [None]:
class TitanicDataset(Dataset):
    def __init__(self,mode='train'):
        super(TitanicDataset,self).__init__()
        self.mode=mode
        if self.mode=='train':
            if os.path.exists('titanic_train.csv'):
                self.data=pd.read_csv('titanic_train.csv')
            else:
                raise FileNotFoundError
        elif self.mode=='test':
            if os.path.exists('titanic_test.csv'):
                self.data=pd.read_csv('titanic_test.csv')
            else:
                raise FileNotFoundError
        else:
            raise ValueError('Invaild argument at \'mode\'. expected \'train\' or \'test\'')
    
    def __getitem__(self,idx):
        return torch.FloatTensor(self.data.iloc[idx,1:].values),torch.FloatTensor(self.data.iloc[idx,[0]].values)

    def __len__(self):
        return len(self.data)

## 학습을 위한 데이터 준비

In [None]:
batch_size=32
train_ratio=0.8

all_data=TitanicDataset(mode='train')

train_data_len=int(len(all_data)*0.8)
valid_data_len=len(all_data)-train_data_len

train_data,valid_data=random_split(all_data,[train_data_len,valid_data_len])

train_loader=DataLoader(train_data,batch_size=batch_size,shuffle=True)
valid_loader=DataLoader(valid_data,batch_size=batch_size,shuffle=False)

## TitanicModel 구현

In [None]:
class TitanicModel(nn.Module):
    def __init__(self):
        super(TitanicModel,self).__init__()
        self.layer1=nn.Linear(6,10)
        self.layer2=nn.Linear(10,1)
        self.activation=nn.Sigmoid()

    def forward(self,x):
        x=self.layer1(x)
        x=self.activation(x)
        x=self.layer2(x)
        return x

## 모델, 손실 함수, 최적화 알고리즘 정의

In [None]:
lr=0.001

model=TitanicModel()
criterion=nn.BCEWithLogitsLoss() # nn.Sigmoid() + nn.BCELoss()
optimizer=optim.SGD(model.parameters(),lr=lr)

model=model.to(device)
criterion=criterion.to(device)

## 학습

In [None]:
def train(model,criterion,optimizer,train_loader,valid_loader,num_epochs=10,print_every=1,early_stop=None,model_path='titanic.pth'):
    train_logs={'Loss':[],'Accuracy':[]}
    valid_logs={'Loss':[],'Accuracy':[]}
    patience=0
    best_acc=-np.inf
    for epoch in range(num_epochs):
        train_loss=0
        valid_loss=0
        train_acc=0
        valid_acc=0
        # training step
        model.train()
        for data,target in train_loader:
            optimizer.zero_grad()
            data,target=data.to(device),target.to(device)
            pred=model(data)
            loss=criterion(pred,target)
            loss.backward()
            optimizer.step()
            train_loss+=loss.item()*data.size(0)
            for p,t in zip(pred,target):
                if p>=0 and t==1:
                    train_acc+=1
                elif p<0 and t==0:
                    train_acc+=1
        train_loss/=len(train_data)
        train_acc/=len(train_data)
        train_logs['Loss'].append(train_loss)
        train_logs['Accuracy'].append(train_acc)
        if (epoch+1)%print_every==0:
            print('Training   Epoch {} - Loss : {:.8f} Accuracy : {:.4f}%'.format(epoch,train_loss,train_acc*100))
        # validation step
        with torch.no_grad():
            model.eval()
            for data,target in valid_loader:
                data,target=data.to(device),target.to(device)
                pred=model(data)
                loss=criterion(pred,target)
                valid_loss+=loss.item()*data.size(0)
                for p,t in zip(pred,target):
                    if p>=0 and t==1:
                        valid_acc+=1
                    elif p<0 and t==0:
                        valid_acc+=1
            valid_loss/=len(valid_data)
            valid_acc/=len(valid_data)
            valid_logs['Loss'].append(valid_loss)
            valid_logs['Accuracy'].append(valid_acc)
            if (epoch+1)%print_every==0:
                print('Validation Epoch {} - Loss : {:.8f} Accuracy : {:.4f}%'.format(epoch,valid_loss,valid_acc*100))
            if valid_acc>best_acc:
                best_acc=valid_acc
                torch.save(model.state_dict(),model_path)
                if early_stop is not None:
                    patience=0
            elif early_stop is not None:
                patience+=1
                if patience>=early_stop:
                    print('Training finished by early stopping')
                    return train_logs,valid_logs
    return train_logs,valid_logs

In [None]:
train_logs,valid_logs = train(model=model,
                              criterion=criterion,
                              optimizer=optimizer,
                              train_loader=train_loader,
                              valid_loader=valid_loader,
                              num_epochs=100,
                              print_every=10,
                              early_stop=None,
                              model_path='titanic.pth')

In [None]:
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.plot([i for i in range(len(train_logs['Loss']))],train_logs['Loss'],label='train_loss')
plt.plot([i for i in range(len(valid_logs['Loss']))],valid_logs['Loss'],label='valid_loss')
plt.legend()
plt.show()

In [None]:
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.plot([i for i in range(len(train_logs['Accuracy']))],train_logs['Accuracy'],label='train_loss')
plt.plot([i for i in range(len(valid_logs['Accuracy']))],valid_logs['Accuracy'],label='valid_loss')
plt.legend()
plt.show()

## 평가

In [None]:
test_data=TitanicDataset(mode='test')
test_loader=DataLoader(test_data,batch_size=1,shuffle=False)

model.load_state_dict(torch.load('titanic.pth'))
model=model.to(device)

In [None]:
def test(model,criterion,test_loader):
    test_loss=0
    test_acc=0
    result_table=pd.DataFrame(columns=['Prediction','Target'])
    with torch.no_grad():
        model.eval()
        for data,target in test_loader:
            data,target=data.to(device),target.to(device)
            pred=model(data)
            loss=criterion(pred,target)
            test_loss+=loss.item()*data.size(0)
            for p,t in zip(pred.view(-1),target.view(-1)):
                if p>=0 and t==1:
                    test_acc+=1
                elif p<0 and t==0:
                    test_acc+=1
                result_table=result_table.append({'Prediction':1 if p>=0 else 0,'Target':t.item()},ignore_index=True)
        test_loss/=len(test_data)
        test_acc/=len(test_data)
    return test_loss,test_acc,result_table

In [None]:
test_loss,test_acc,result_table=test(model=model,
                                     criterion=criterion,
                                     test_loader=test_loader)

In [None]:
print('Test Loss : {:.8f} Test Accuracy : {:.4f}%'.format(test_loss,test_acc*100))

In [None]:
result_table

In [None]:
# For entire comparison between prediction and target
for i,(p,t) in enumerate(result_table.values):
    print('{} - Pred: {} Target: {}'.format(i,p,t))

# MNIST Dataset

## MNIST Dataset 불러오기

In [None]:
transform=transforms.ToTensor()

all_data=MNIST(root='.',train=True,download=True,transform=transform)

In [None]:
batch_size=128
train_ratio=0.8

train_data_len=int(len(all_data)*0.8)
valid_data_len=len(all_data)-train_data_len

train_data,valid_data=random_split(all_data,[train_data_len,valid_data_len])

train_loader=DataLoader(train_data,batch_size=batch_size,shuffle=True)
valid_loader=DataLoader(valid_data,batch_size=batch_size,shuffle=False)

## MNISTModel 구현

In [None]:
class MNISTModel(nn.Module):
    def __init__(self):
        super(MNISTModel,self).__init__()
        self.layer1=nn.Linear(28*28,1000)
        self.layer2=nn.Linear(1000,10)
        self.activation=nn.Sigmoid()

    def forward(self,x):
        x=x.view(-1,28*28)
        x=self.layer1(x)
        x=self.activation(x)
        x=self.layer2(x)
        return x

## 모델, 손실 함수, 최적화 알고리즘 정의

In [None]:
lr=0.001

model=MNISTModel()
criterion=nn.CrossEntropyLoss()
optimizer=optim.SGD(model.parameters(),lr=lr)

model=model.to(device)
criterion=criterion.to(device)

## 학습

In [None]:
def train(model,criterion,optimizer,train_loader,valid_loader,num_epochs=10,print_every=1,early_stop=None,model_path='mnist.pth'):
    train_logs={'Loss':[],'Accuracy':[]}
    valid_logs={'Loss':[],'Accuracy':[]}
    patience=0
    best_acc=-np.inf
    for epoch in range(num_epochs):
        train_loss=0
        valid_loss=0
        train_acc=0
        valid_acc=0
        # training step
        model.train()
        for data,target in train_loader:
            optimizer.zero_grad()
            data,target=data.to(device),target.to(device)
            pred=model(data)
            loss=criterion(pred,target)
            loss.backward()
            optimizer.step()
            train_loss+=loss.item()*data.size(0)
            train_acc+=torch.sum(pred.argmax(1)==target).item()
        train_loss/=len(train_data)
        train_acc/=len(train_data)
        train_logs['Loss'].append(train_loss)
        train_logs['Accuracy'].append(train_acc)
        if (epoch+1)%print_every==0:
            print('Training   Epoch {} - Loss : {:.8f} Accuracy : {:.4f}%'.format(epoch,train_loss,train_acc*100))
        # validation step
        with torch.no_grad():
            model.eval()
            for data,target in valid_loader:
                data,target=data.to(device),target.to(device)
                pred=model(data)
                loss=criterion(pred,target)
                valid_loss+=loss.item()*data.size(0)
                valid_acc+=torch.sum(pred.argmax(1)==target).item()
            valid_loss/=len(valid_data)
            valid_acc/=len(valid_data)
            valid_logs['Loss'].append(valid_loss)
            valid_logs['Accuracy'].append(valid_acc)
            if (epoch+1)%print_every==0:
                print('Validation Epoch {} - Loss : {:.8f} Accuracy : {:.4f}%'.format(epoch,valid_loss,valid_acc*100))
            if valid_acc>best_acc:
                best_acc=valid_acc
                torch.save(model.state_dict(),model_path)
                if early_stop is not None:
                    patience=0
            elif early_stop is not None:
                patience+=1
                if patience>=early_stop:
                    print('Training finished by early stopping')
                    return train_logs,valid_logs
    return train_logs,valid_logs

In [None]:
train_logs,valid_logs = train(model=model,
                              criterion=criterion,
                              optimizer=optimizer,
                              train_loader=train_loader,
                              valid_loader=valid_loader,
                              num_epochs=10,
                              print_every=1,
                              early_stop=None,
                              model_path='mnist.pth')

In [None]:
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.plot([i for i in range(len(train_logs['Loss']))],train_logs['Loss'],label='train_loss')
plt.plot([i for i in range(len(valid_logs['Loss']))],valid_logs['Loss'],label='valid_loss')
plt.legend()
plt.show()

In [None]:
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.plot([i for i in range(len(train_logs['Accuracy']))],train_logs['Accuracy'],label='train_loss')
plt.plot([i for i in range(len(valid_logs['Accuracy']))],valid_logs['Accuracy'],label='valid_loss')
plt.legend()
plt.show()

## 평가

In [None]:
test_data=MNIST(root='.',train=False,download=True,transform=transform)
test_loader=DataLoader(test_data,batch_size=128,shuffle=False)

model.load_state_dict(torch.load('mnist.pth'))
model=model.to(device)

In [None]:
def test(model,criterion,test_loader):
    test_loss=0
    test_acc=0
    result_table=pd.DataFrame(columns=['Prediction','Target'])
    with torch.no_grad():
        model.eval()
        for data,target in test_loader:
            data,target=data.to(device),target.to(device)
            pred=model(data)
            loss=criterion(pred,target)
            test_loss+=loss.item()*data.size(0)
            test_acc+=torch.sum(pred.argmax(1)==target).item()
            for p,t in zip(pred.argmax(1),target):
                result_table=result_table.append({'Prediction':p.item(),'Target':t.item()},ignore_index=True)
        test_loss/=len(test_data)
        test_acc/=len(test_data)
    return test_loss,test_acc,result_table

In [None]:
test_loss,test_acc,result_table=test(model=model,
                                     criterion=criterion,
                                     test_loader=test_loader)

In [None]:
print('Test Loss : {:.8f} Test Accuracy : {:.4f}%'.format(test_loss,test_acc*100))

In [None]:
result_table

In [None]:
# For entire comparison between prediction and target
for i,(p,t) in enumerate(result_table.values):
    print('{} - Pred: {} Target: {}'.format(i,p,t))

## 시각화

In [None]:
tensorToImage=transforms.ToPILImage()
fig=plt.figure(figsize=(10,12))
cols=4
rows=4
sample_index=np.random.randint(0,len(test_data),size=(cols*rows))

for i in range(1,cols*rows+1):
    img=tensorToImage(test_data[sample_index[i-1]][0])
    pred=model(test_data[sample_index[i-1]][0].unsqueeze(0).to(device))
    fig.add_subplot(rows,cols,i)
    plt.title('pred:{} label:{}'.format(pred.argmax().cpu().item(),test_data[sample_index[i-1]][1]))
    plt.imshow(img,cmap='gray')
plt.show()