In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as Data
from torchvision import transforms
import pandas as pd
from PIL import Image
import math
import pickle

ModuleNotFoundError: No module named 'torch'

In [None]:
LR = 0.05
BATCH_SIZE = 64
EPOCH = 100
DATA_FOLDER = 'ele'

In [None]:
Train_Loss = []
Test_Loss = []

In [None]:
def getData(mode):
    if mode == 'train':
        
        img = pd.read_csv(DATA_FOLDER + '_train_img.csv')
        label = pd.read_csv(DATA_FOLDER + '_train_label.csv')
        return np.squeeze(img.values), np.squeeze(label.values)
    else:
        img = pd.read_csv(DATA_FOLDER + '_test_img.csv')
        label = pd.read_csv(DATA_FOLDER + '_test_label.csv')
        return np.squeeze(img.values), np.squeeze(label.values)


class Loader(Data.Dataset):
    def __init__(self, root, mode):
        """
        Args:
            root (string): Root path of the dataset.
            mode : Indicate procedure status(training or testing)

            self.img_name (string list): String list that store all image names.
            self.label (int or float list): Numerical list that store all ground truth label values.
        """
        self.root = root
        self.img_name, self.label = getData(mode)
        self.mode = mode
        print("> Found %d images..." % (len(self.img_name)))
        

    def __len__(self):
        #------------return the size of dataset
        return len(self.img_name)

    def __getitem__(self, index):
        #-------------Get the image path from 'self.img_name' and load it.
                  
        path = self.root + str(self.img_name[index]) + '.png'
        img_as_img = Image.open(path)
        #img = img.resize(,Image.ANTIALIAS)
        #box=(0,240,640,480)
        #img=img.crop(box)
        
        #-------------Get the ground truth label from self.label"""
        label = torch.from_numpy(self.label)[index]
        
        #-------------Transform the .jpeg rgb images
        if self.mode == 'train':
            transform1 = transforms.Compose([
#                 transforms.RandomHorizontalFlip(p=0.5),
#                 transforms.RandomVerticalFlip(p=0.5),
#                 transforms.RandomRotation(degrees=(-45,45), resample=False, expand=False, center=None),
#                 transforms.ColorJitter(contrast=(0,1)),
                transforms.Resize(( 280, 210)),
                transforms.ToTensor(), # range [0, 255] -> [0.0,1.0]
                #transforms.Normalize(mean = (0.5, 0.5, 0.5), std = (0.5, 0.5, 0.5)),#range [0, 255] ->[-1.0,1.0]
                
                ]
            )
        else:
            transform1 = transforms.Compose([
                transforms.Resize(( 280, 210)),
                transforms.ToTensor(), # range [0, 255] -> [0.0,1.0]
                #transforms.Normalize(mean = (0.5, 0.5, 0.5), std = (0.5, 0.5, 0.5)),
             
                ]
            )
        img_tran = transform1(img_as_img)
                
        #-------------Return processed image and label
        return img_tran, label

In [None]:
# Dataloader

train_data=Loader('./'+ DATA_FOLDER +'/','train')
test_data=Loader('./'+ DATA_FOLDER +'/','test')
train_loader = Data.DataLoader(dataset=train_data, batch_size=BATCH_SIZE)
test_loader = Data.DataLoader(dataset=test_data, batch_size=BATCH_SIZE)
print (train_loader)
print (test_loader)

In [None]:

class DeepConvNet(nn.Module):
    def __init__(self):
        super(DeepConvNet, self).__init__()
    
        #####---1
        self.conv1 = nn.Conv2d(3, 32, kernel_size=4, stride=1, bias=False)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        #####
        
        #####---2
        self.conv2 = nn.Conv2d(32, 32, kernel_size=4, stride=1 ,bias=False)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        #####
        
        #####---3
        self.conv3 = nn.Conv2d(32, 32, kernel_size=4, stride=1, bias=False)
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        #####
        
        #####---3
        self.conv4 = nn.Conv2d(32, 32, kernel_size=4, padding=1, bias=False)
        self.pool4 = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        #####
        
        self.fc5 = nn.Linear(in_features=8960, out_features=200, bias=True)
        self.fc6 = nn.Linear(in_features=200, out_features=15, bias=True)

        
    def forward(self, x):
        
        ###-----1------
        out = self.conv1(x)
        out = self.pool1(out)        
        
        ###-----2------
        out = self.conv2(out)
        out = self.pool2(out)
       
        ###-----3------
        out = self.conv3(out)
        out = self.pool3(out)
        
        ###-----4------
        out = self.conv4(out)
        out = self.pool4(out)
        
        out = self.conv5(out)
        out = F.relu(out)
        out = self.pool5(out)
        
        out = out.view(out.size(0), -1) #flatten
        out = self.fc5(out)
        out = self.fc6(out)
        out = torch.Softmax(out)
        #out = torch.tanh(out)
        #out = F.softsign(out)
        
        
        return out

In [None]:
def train_DeepConv(epoch):
    model_DeepConvNet.train()
    correct = 0
    for batch_idx, (data, target) in enumerate(train_loader):
        if torch.cuda.is_available():
            data, target = data.to(device=device, dtype=torch.float), target.to(device)

        optimizer.zero_grad()
        output = model_DeepConvNet(data)
        target = target.float()
        loss = Loss(output, target)
        loss.backward()
        optimizer.step()
#         print("output" ,output.cpu().detach().numpy().squeeze()[0:3])
#         print("ground truth" ,target.cpu().detach().numpy().squeeze()[0:3])
    Train_Loss.append(round((loss.data).cpu().numpy().tolist(),6))
    
   
    print('Train Epoch: {} \t Loss_d: {} Loss_phi:{}'.format(
            epoch, loss_d.data , loss_phi.data))

In [None]:
def test_DeepConv(epoch):
    model_DeepConvNet.eval()
    test_loss_d = 0.0
    test_loss_phi = 0.0
    
    correct = 0
    for batch_idx, (data, target) in enumerate(test_loader):
        if torch.cuda.is_available():
            data, target = data.to(device=device, dtype=torch.float), target.to(device)
        with torch.no_grad():
            output = model_DeepConvNet(data)
        target = target.float()
        test_loss = Loss(output, target)
    
    
    print(" ground truth\n" ,target.cpu().detach().numpy().squeeze()[0:8])
    print(output.cpu().detach().numpy().squeeze()[0:8])
    
    Test_Loss.append(round((test_loss.data).cpu().numpy().tolist(),6))
        
    print('========Test set: Average loss:{}\t========\n'
          .format(test_loss))

In [None]:
model_DeepConvNet = DeepConvNet().cuda(0)

if torch.cuda.is_available():
    device = torch.device('cuda')
    model_DeepConvNet.to(device) 
#------------define optimizer/loss function
Loss = nn.CrossEntropyLoss()    
optimizer = torch.optim.SGD(model_DeepConvNet.parameters(), lr=LR, momentum=0.9, 
                            dampening=0, weight_decay=0.0005, nesterov=False)
print(model_DeepConvNet)

In [None]:
def saveloss():
    
    file = open('Train_loss.pickle', 'wb')
    pickle.dump(Train_Loss, file)
    file.close()
    # pickle a variable to a file
    file = open('Test_loss.pickle', 'wb')
    pickle.dump(Test_Loss, file)
    file.close()

In [None]:
#---------------Train-----------------
for epoch in range(1,EPOCH+1):
    train_DeepConv(epoch)
    test_DeepConv(epoch)
    if (epoch%10==0):
        PATH ='./model/'+str(epoch)+'-linear.pkl'
        torch.save(model_DeepConvNet.state_dict(), PATH)
    
