In [1]:
from PIL import Image, ImageOps
from numpy import asarray
from random import shuffle
import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import matplotlib.pyplot as plt
from torchvision import datasets, transforms
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm



##### 0, Define the hyper parameters

In [2]:
"TODO: change hyper parameters"
n_epochs = 100
batch_size_train = 128
batch_size_test = 1000
learning_rate = 0.001
momentum = 0

##### 1, Loading training and testing data

In [3]:
TRAIN_DIR ='training'
TEST_DIR ='testing_toy'
TRAIN_LABEL = 'training/labels.txt'
TEST_LABEL = 'testing_toy/labels.txt'

#### 1.1 load data from google drive, If you use the data from google drive, you need to define a torch.utils.data.Dataset

In [4]:
# create train_x, train_yimport os
# step 1: load image to numpy array
import os
import numpy
data_list = os.listdir(TRAIN_DIR)
#print(data_list)

def read_image(name):
    # use PIL to load '.png' format, the output should be numpy array
    data=[]
    if name.endswith(".png"):
        image=Image.open(os.path.join(TRAIN_DIR, name))#.convert('RGB')
        #gray_image = ImageOps.grayscale(image)
        data = asarray(image)
        #print(data.shape)

        #print('datashape', data.shape)
        #print(data)
    return data

# make a dictionary, read from labels.txt
def label_dict(name):
    d = {}
    with open(name,'r') as f:
        for line in f.readlines():
            lst = line.split()
            
            lst[1] = int(lst[1])
            d[lst[0]] = lst[1]
    return d

# label_dict = {'0267.png': 4, '0267.png':5}

train_x, train_y = [], []
labelList = label_dict(TRAIN_LABEL)
for name in data_list:
    if name.endswith(".png"):
        #print(name)
        image = read_image(name) 
        #print(image)
        # shape should be (num_channels, H, W)
        label = labelList[name]
        #print(image)
        #print(label)
        train_x.append(image)
        train_y.append(label)

train_x = numpy.stack(train_x) # (N, num_channels, H, W)
train_y = numpy.stack(train_y)

#print(train_x.mean())
# print(np.std(train_x)
#print(len(train_x.shape))
# assert len(train_x.shape) == 4

In [5]:
data_list_test = os.listdir(TEST_DIR)
#print(data_list)

def read_image_test(name):
    # use PIL to load '.png' format, the output should be numpy array
    data=[]
    if name.endswith(".png"):
        image=Image.open(os.path.join(TEST_DIR, name))#.convert('RGB')
        #gray_image = ImageOps.grayscale(image)
        data = asarray(image)
        #print('datashape', data.shape)
        #print(data)
    return data

test_x, test_y = [], []
labelList = label_dict(TEST_LABEL)
for name in data_list_test:
    if name.endswith(".png"):
        #print(name)
        image = read_image_test(name) 
        #print(image)
        # shape should be (num_channels, H, W)
        label = labelList[name]
        #print(image)
        #print(label)
        test_x.append(image)
        test_y.append(label)

test_x = numpy.stack(test_x) # (N, num_channels, H, W)
test_y = numpy.stack(test_y)

In [6]:
mean_train = train_x.mean()
std_train = (np.std(train_y))

mean_test = test_x.mean()
std_test = (np.std(test_y))

In [7]:
                           
class EquationDataset(Dataset):

    def __init__(self, x, y, transform=None):
        ### x.shape = (N, num_channels, H, W)
        ### y.shape = (N)
        self.x = x
        self.y = y
        self.transform = transform
        #mean = statistics.mean(x), std = np.std(y)
        
        
    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        # apply transformer: ToTensor and Normalize
        img = self.x[idx]
        # print(img.shape)
        if self.transform != None:
            img = self.transform(img)
            
        # convert numpy to torch tensor
        # normalize to [0, 1]
            
        label = self.y[idx]-1
        return (img, label)

#### 1.2 Now lets load data from torchvision instead

In [8]:

"TODO: change the augmentation method"
transform_norm = transforms.Compose([
    lambda x: Image.fromarray(x),
    transforms.Resize([28,28]),
    transforms.ToTensor(),
    # transforms.Normalize([0.5],[0.5]),
    transforms.Normalize([mean_train/255], [std_train/255])
])
#transform = transforms.Compose([transforms.ToTensor(),
                              #transforms.Normalize((0.5,), (0.5,)),])

##### 1.3 First, check the documentation of dataloader

In [None]:
# trainloader = torch.utils.data.DataLoader(trainset0, batch_size=batch_size_train, shuffle=True)
# testloader = torch.utils.data.DataLoader(testset0, batch_size=batch_size_test, shuffle=True)

In [9]:
# trainset = EmnistDataset(train_x, train_y)
# testset = EmnistDataset(test_x, test_y)
trainset = EquationDataset(train_x, train_y, transform = transform_norm)
testset = EquationDataset(test_x, test_y, transform = transform_norm)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size_train, shuffle=False)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size_test, shuffle=False)
# print(trainset.x)

##### 2 Define the model

In [10]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2)

        # self.conv2_drop = nn.Dropout2d()
        
        self.fc1 = nn.Linear(32*7*7, 10)
        #self.fc2 = nn.Linear(50, 10) # TODO change label+1

    def forward(self, x):
        #print("feature" , x.shape)
        x = F.relu(F.max_pool2d(self.conv1(x), 2))

        x = F.relu(F.max_pool2d(self.conv2(x), 2))
        
        x = x.view(x.size(0), -1)
        x = self.fc1(x)
        # x = F.relu(self.fc1(x))
        #x = F.dropout(x, training=self.training)
        #x = self.fc2(x)
        return F.log_softmax(x, -1)


In [None]:
"Read the documentation of torchvision.models to try more cnn models"

##### 3 Write the training function and the testing function

In [11]:
def test(model, test_loader, device):
    # evaluation, freeze 
    model.eval()
    total_num = 0
    total_correct = 0
    with torch.no_grad():
        for _, (data, target) in enumerate(test_loader):
            
            data = data.to(device)
            target = target.to(device)
            
            predict_one_hot = model(data)
            #print(predict_one_hot)
            _, predict_label = torch.max(predict_one_hot, 1)
            #print("label", predict_label, "target", target)
            total_correct += (predict_label == target).sum().item()
            total_num += target.size(0)
            if total_num == 100:
                with open('predict_label.txt', 'w') as f:
                    for i in range(100):
                        if data_list_test[i].endswith(".png"):
                        
                            f.write(data_list_test[i]+' ')
                            f.write(str(predict_label.numpy()[i]))
                            f.write('\n')
                        #print(data_list_test[i], predict_label.numpy()[i])
            #print ('total_num', total_num)
        
    return (total_correct / total_num)
            

In [None]:
def train(model, train_loader, test_loader, num_epoch, learning_rate, momentum, device):
    train_losses = []
    
    # 1, define optimizer
    
    "TODO: try different optimizer"
    
    
    # optimizer = optim.SGD(network.parameters(), lr=learning_rate,
    #                   momentum=momentum)
    optimizer = optim.Adam(network.parameters(), lr=learning_rate)

    
    for epoch in tqdm(range(num_epoch)):
        # train the model
        model.train()
        
        for i, (data, target) in enumerate(train_loader):
            
            data = data.to(device)
            # print('*****', data.shape)
            target = target.to(device)

            optimizer.zero_grad()
            
            # 2, forward
            output = network(data)
            #print('output', output)
            
            # 3, calculate the loss
            
                    
            "TODO: try use cross entropy loss instead "
            
            loss = F.nll_loss(output, target)
            
            #loss = nn.CrossEntropyLoss()
            #input = torch.randn(3, 10, requires_grad=True)
            #target = torch.empty(3, dtype=torch.long).random_(10)
            #output = loss(input, target)
            #output.backward()
            
            # 4, backward
            loss.backward()
            
            
            optimizer.step()
            
        # evaluate the accuracy on test data for each epoch
        accuracy_train = test(model, train_loader, device)
        accuracy_test = test(model, test_loader, device)
        with open('result.txt', 'a') as f:
            f.write('accuracy_train'+str(accuracy_train))
            f.write('\n')
            f.write('accuracy_test'+str(accuracy_test))
            f.write('\n')
        #print('accuracy_train', accuracy_train)
        #print('accuracy_test', accuracy_test)
        
    # 5, save model
    
    "TODO: change the number of epochs save the model with the best prediction accuracy"
    

In [None]:
device0 = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
# use cpu if you do not have gpu installed in your computer
network = Net().to(device0)
train(model=network, train_loader=trainloader, test_loader=testloader, num_epoch=n_epochs, learning_rate=learning_rate, momentum=momentum, device=device0)

In [None]:
torch.save(network.state_dict(), 'final.pth')

In [12]:
device0 = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
# use cpu if you do not have gpu installed in your computer
network = Net().to(device0)

#### 4 Calculate the accuracy

In [13]:
"TODO: load your saved model and calculated the accuracy, you can use the test function provided above"

'TODO: load your saved model and calculated the accuracy, you can use the test function provided above'

In [14]:
model = Net()
model.load_state_dict(torch.load('final.pth'))
model.eval()
accuracy_test = test(model, testloader, device=device0)
print('accuracy_test', accuracy_test)

accuracy_test 0.95
