## CNN

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from random import randint
import time
import random
import torch.utils.data as data
from PIL import Image
import torchvision.transforms as transforms
import cv2
import sys, os

In [16]:
import torch
print(torch.__version__)
print(torch.cuda.is_available())

1.13.0
False


In [2]:
device= torch.device("cpu") #cuda
print(device)

root_dir = os.getcwd() + '/data_processing'

train_dir = root_dir + '/train_set/'
val_dir = root_dir + '/val_set/'
test_dir = root_dir + '/test_set/'

cpu


In [3]:
model_dir = os.getcwd() + '/model_CNN/'

In [4]:
class FaceSet(data.Dataset):
    def __init__(self,folder,labels,transform):
        self.labels = labels
        self.transform=transform
        imgs=[os.path.join(folder,f) for f in os.listdir(folder)]
        self.imgs = imgs
        
    def __getitem__(self,index):
        imgpath=self.imgs[index]
        im=Image.open(imgpath)
        if im.mode == "RGBA":
            r, g, b, a=im.split ()
            im=Image.merge ("RGB", (r, g, b))
        elif im.mode!="RGB":
            im=im.convert ("RGB")
        im=self.transform(im)
        label = self.labels[index]
        return im,label
    def __len__(self):
        return len(self.imgs)

In [5]:
imgsize=64
transform_train=transforms.Compose([ 
    transforms.Resize((64,64)),
    transforms.RandomCrop((imgsize,imgsize)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
])
transform_test=transforms.Compose([ 
    transforms.Resize((imgsize,imgsize)),
    transforms.ToTensor(),
])

for fn in os.listdir(train_dir):
    im=Image.open(train_dir + fn)
    print(transform_train(im).size())
    break

torch.Size([3, 64, 64])


In [6]:
train_labels = torch.load(root_dir + '/train_labels.pt')
test_labels = torch.load(root_dir + '/test_labels.pt')
val_labels = torch.load(root_dir + '/val_labels.pt')

In [8]:
# define dataset
trainset=FaceSet(train_dir,train_labels,transform=transform_train)
valset=FaceSet(val_dir,val_labels,transform=transform_test)
testset=FaceSet(test_dir,test_labels,transform=transform_test)

In [9]:
class cnn(nn.Module):
    def __init__(self):
        super(cnn, self).__init__()
        # 3 x 64 x 64 -> 64 x 32 x 32
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1 ),
            nn.MaxPool2d(2,2),
            nn.BatchNorm2d(64),
            nn.ReLU()
        )
        
        # 64 x 32 x 32 -> 128 x 16 x 16
        self.conv2 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding=1 ),
            nn.MaxPool2d(2,2),
            nn.BatchNorm2d(128),
            nn.ReLU()
        )
        
        # 128 x 16 x 16 -> 256 x 8 x 8
        self.conv3 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=3, padding=1 ),
            nn.MaxPool2d(2,2),
            nn.BatchNorm2d(256),
            nn.ReLU()
        )
        
        # 256 x 8 x 8 -> 512 x 4 x 4
        self.conv4 = nn.Sequential(
            nn.Conv2d(256, 512, kernel_size=3, padding=1 ),
            nn.MaxPool2d(2,2),
            nn.BatchNorm2d(512),
            nn.ReLU()
        )
        
        self.linear1 = nn.Linear(8192, 512)
        self.linear2 = nn.Linear(512, 2)

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        
        x = x.view(-1, 8192)
        x = self.linear1(x)
        x = F.relu(x)
        x = self.linear2(x)
        return x
    
def get_error( scores , labels ):
    bs=scores.size(0)
    predicted_labels = scores.argmax(dim=1)
    indicator = (predicted_labels == labels)
    num_matches=indicator.sum()
    
    return 1-num_matches.float()/bs  

def eval_on_test_set(loader):
    running_error=0
    num_batches=0
    for batch_idx,(img,label) in enumerate(loader):
        img=img
        img=img.to(device)
        scores=net(img) 
        label=label.to(device) 
        error = get_error( scores , label)
        running_error += error.item()
        num_batches+=1
    total_error = running_error/num_batches
    print( 'test error  = ', total_error*100 ,'percent')


In [12]:
net = cnn()
net = net.to(device)
criterion = nn.CrossEntropyLoss()
bs = 10
trainloader=torch.utils.data.DataLoader(trainset,batch_size=bs,shuffle=True,num_workers=0)
valloader=torch.utils.data.DataLoader(valset,batch_size=bs,shuffle=False,num_workers=0)
testloader=torch.utils.data.DataLoader(testset,batch_size=bs,shuffle=False,num_workers=0)

In [13]:
start = time.time()
lr = 0.1
optimizer = torch.optim.Adam(net.parameters(), lr = 0.1)

for epoch in range(50):
#     if epoch % 10 == 0 and epoch > 10: 
#         lr = lr / 1.5
# #     if epoch == 10 or epoch == 14 or epoch == 18:
# #         lr = lr / 2
#     optimizer=torch.optim.SGD(net.parameters(), lr = lr)
    print(epoch)
    running_loss=0
    running_error=0
    num_batches=0
    for batch_idx,(img,label) in enumerate(trainloader):
        print("before")
        for p in net.parameters():
            if p.requires_grad:
                print(p.name, p.data)
                break
        inputs = img
#         inputs.requires_grad_()
#         inputs=inputs.to(device)
        scores=net(inputs) 
#         label=label.to(device)
        loss =  criterion( scores , label) 
        optimizer.zero_grad()   
        loss.backward()
        optimizer.step()
#         torch.cuda.empty_cache()
        running_loss += loss.detach().item()
        error = get_error( scores.detach() , label)
        running_error += error.item()
        num_batches+=1
        print("after")
        for p in net.parameters():
            if p.requires_grad:
                print(p.name, p.data)
                break
        import pdb
        pdb.set_trace()
    total_loss = running_loss/num_batches
    total_error = running_error/num_batches
    elapsed_time = time.time() - start
#     if epoch % 10 == 0 : 
    if True:
        print(' ')
        print('epoch=',epoch, ' time=', elapsed_time,
              ' loss=', total_loss , ' error=', total_error*100 ,'percent lr=', lr)
        eval_on_test_set(valloader)
#         state = {'net':net.state_dict(),'optimizer':optimizer.state_dict(),'epoch':epoch}
        torch.save(net.state_dict(), model_dir + str(epoch) + '.pth')

0
before
None tensor([[[[-0.0246, -0.1740,  0.0036],
          [-0.0339, -0.0967, -0.1110],
          [-0.1240, -0.1803,  0.0393]],

         [[-0.0360,  0.0760,  0.1860],
          [-0.1812, -0.0275,  0.0535],
          [-0.1886, -0.0501, -0.0741]],

         [[-0.0908, -0.1469, -0.0490],
          [-0.0054, -0.0565,  0.0363],
          [ 0.1269, -0.0756, -0.0654]]],


        [[[ 0.0171, -0.1361, -0.1221],
          [ 0.0828,  0.1699,  0.0022],
          [ 0.0162,  0.0744,  0.1060]],

         [[ 0.0285,  0.1253, -0.1256],
          [ 0.0264, -0.0877, -0.0615],
          [-0.1132,  0.1196,  0.1880]],

         [[-0.0448, -0.1337, -0.0800],
          [-0.1609,  0.0888,  0.1584],
          [-0.0826,  0.1668,  0.1280]]],


        [[[-0.1034, -0.1555, -0.0420],
          [-0.0130,  0.0362, -0.0055],
          [ 0.0965, -0.0408, -0.0849]],

         [[ 0.1129,  0.1828,  0.1702],
          [ 0.0382, -0.0462,  0.1378],
          [ 0.0058, -0.0526, -0.1743]],

         [[-0.0379, -0.1719,  

BdbQuit: 

In [None]:
eval_on_test_set(testloader)
for i in range(0, 50, 10):
    net.load_state_dict(torch.load(model_dir + str(i) + '.pth'))
    net.eval()
    eval_on_test_set(testloader)

## MLP

In [44]:
class mlp(nn.Module):
    def __init__(self, input_size, hidden_size1, hidden_size2, output_size):
        super(mlp , self).__init__()
        self.layer1 = nn.Linear( input_size, hidden_size1 , bias=False)
        self.layer2 = nn.Linear( hidden_size1, hidden_size2 , bias=False)
        self.layer3 = nn.Linear( hidden_size2, output_size , bias=False)
        
    def forward(self, x):
        x = self.layer1(x)
        x = F.relu(x)
        x = self.layer2(x)
        x = F.relu(x)
        x = self.layer3(x)
        return x

def get_error(scores , labels):
    bs=scores.size(0)
    predicted_labels = scores.argmax(dim=1)
    print(predicted_labels)
    import pdb
    pdb.set_trace()
    indicator = (predicted_labels == labels)
    num_matches=indicator.sum()
    
    return 1-num_matches.float()/bs  

def eval_on_test_set(loader):
    running_error=0
    num_batches=0
    for batch_idx,(img,label) in enumerate(loader):
        img=img.view(img.size()[0],-1)
        #img=img.to(device)
        scores=net(img) 
        label=label.to(device) 
        error = get_error( scores , label)
        running_error += error.item()
        num_batches+=1
    total_error = running_error/num_batches
    print( 'test error  = ', total_error*100 ,'percent')

In [41]:
net = mlp(imgsize**2*3, imgsize*4, imgsize, 2)
net = net.to(device)

In [42]:
criterion = nn.CrossEntropyLoss()
bs = 8
trainloader=torch.utils.data.DataLoader(trainset,batch_size=bs,shuffle=True,num_workers=0)
valloader=torch.utils.data.DataLoader(valset,batch_size=bs,shuffle=False,num_workers=0)
testloader=torch.utils.data.DataLoader(testset,batch_size=bs,shuffle=False,num_workers=0)

In [43]:
start = time.time()
lr = 0.05
optimizer = torch.optim.Adam(net.parameters(),lr = lr)
for epoch in range(50):
#     if epoch % 10 == 0 and epoch > 10: 
#         lr = lr / 1.5
#     optimizer=torch.optim.SGD(net.parameters(), lr = lr)
    running_loss=0
    running_error=0
    num_batches=0
    net.train()
    for batch_idx,(img,label) in enumerate(trainloader):
        
        inputs = img.view(img.size()[0],-1)
        # inputs.requires_grad_()
        # inputs=inputs.to(device)
        optimizer.zero_grad()
        scores=net(inputs) 
        # label=label.to(device)
        loss =  criterion(scores , label) 
        loss.backward()
        optimizer.step()
        torch.cuda.empty_cache()
        running_loss += loss.item()
        error = get_error(scores, label)
        running_error += error.item()
        num_batches+=1

    total_loss = running_loss/num_batches
    total_error = running_error/num_batches
    elapsed_time = time.time() - start
#     if epoch % 10 == 0 : 
    if True:
        print(' ')
        print('epoch=',epoch, ' time=', elapsed_time,
              ' loss=', total_loss , ' error=', total_error*100 ,'percent lr=', lr)
        eval_on_test_set(valloader)

 
epoch= 0  time= 13.031254053115845  loss= 4.2079029890605835  error= 32.02639751410404 percent lr= 0.05
test error  =  32.2265625 percent
 
epoch= 1  time= 32.6482880115509  loss= 0.6931471824645996  error= 31.49187768183424 percent lr= 0.05
test error  =  32.2265625 percent
 
epoch= 2  time= 53.30787396430969  loss= 0.6931471824645996  error= 31.47993311036789 percent lr= 0.05
test error  =  32.2265625 percent
 
epoch= 3  time= 73.26021409034729  loss= 0.6931471824645996  error= 31.47993311036789 percent lr= 0.05
test error  =  32.2265625 percent
 
epoch= 4  time= 93.52429795265198  loss= 0.6931471824645996  error= 31.48291925323448 percent lr= 0.05
test error  =  32.2265625 percent
 
epoch= 5  time= 114.07315993309021  loss= 0.6931471824645996  error= 31.485905396101067 percent lr= 0.05
test error  =  32.2265625 percent


KeyboardInterrupt: 