In [10]:
import numpy as np 
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from torch import optim
import cv2
import csv
import sys
import os
from PIL import Image
import numpy as np
import torch
from torchvision.transforms import Compose, ToTensor
from tqdm.notebook import tqdm
from torch.utils.data.sampler import SubsetRandomSampler
import random
import torch.backends.cudnn as cudnn
import torch.optim as optim
import torch.utils.data
import PIL.Image
from torchvision import datasets
from torchvision import transforms


In [11]:
# detect is gpu available.
use_gpu = torch.cuda.is_available()
if use_gpu:
    print("cuda")
    device =torch.device('cuda:0')
else:
    print("cpu")
    device =torch.device("cpu")
    

cpu


In [12]:
data_transforms =   transforms.Compose([
                    transforms.Resize((224, 224)),
                    transforms.RandomHorizontalFlip(),
                    transforms.ToTensor(),
                    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
                    ])
    
class Image_data(Dataset):
    def __init__(self,label,data,data_transforms):
        self.data_path = data
        self.label_path = label
        self.data_transforms = data_transforms
    def __getitem__(self,index):

        trainX = np.load(self.data_path)
        trainY = np.load(self.label_path)
        image = PIL.Image.fromarray(np.uint8(trainX[index]))
        trainY = torch.Tensor(trainY)
        
        return data_transforms(image),trainY[index]

    def __len__(self):
        return len(np.load(self.label_path))

train_data = Image_data('./trainY.npy','./trainX.npy',data_transforms)


batch_size = 32
validation_split = .1
shuffle_dataset = True
random_seed= 42

# Creating data indices for training and validation splits:
dataset_size = len(train_data)
indices = list(range(dataset_size))
split = int(np.floor(validation_split * dataset_size))
if shuffle_dataset :
    np.random.seed(random_seed)
    np.random.shuffle(indices)
train_indices, val_indices = indices[split:], indices[:split]

train_sampler = SubsetRandomSampler(train_indices)
valid_sampler = SubsetRandomSampler(val_indices)

train_loader = DataLoader(train_data, batch_size=batch_size,sampler=train_sampler)
val_loader = DataLoader(train_data, batch_size=batch_size,sampler=valid_sampler)


In [13]:
class LeNetEncoder(nn.Module):
    """LeNet encoder model for ADDA."""

    def __init__(self):
        """Init LeNet encoder."""
        super(LeNetEncoder, self).__init__()

        self.restored = False

        self.encoder = nn.Sequential(

            nn.Conv2d(3, 32, kernel_size=5,stride = 2),
            nn.MaxPool2d(kernel_size=2),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=5,stride = 2),
            nn.Dropout2d(),
            nn.MaxPool2d(kernel_size=2),
            nn.ReLU(),
            nn.Conv2d(64, 128, kernel_size=5),
            nn.MaxPool2d(kernel_size=2),
            nn.ReLU(),
        )
        self.fc1 = nn.Linear(2048, 512)

    def forward(self, input):
        """Forward the LeNet."""
        conv_out = self.encoder(input)
        feat = self.fc1(conv_out.view(-1, 2048))
        return feat


class LeNetClassifier(nn.Module):
    """LeNet classifier model for ADDA."""

    def __init__(self):
        """Init LeNet encoder."""
        super(LeNetClassifier, self).__init__()
        self.fc2 = nn.Linear(512, 10)

    def forward(self, feat):
        """Forward the LeNet classifier."""
        out = F.dropout(F.relu(feat), training=self.training)
        out = self.fc2(out)
        return out

In [14]:

class Discriminator(nn.Module):
    """Discriminator model for source domain."""

    def __init__(self, input_dims, hidden_dims, output_dims):
        """Init discriminator."""
        super(Discriminator, self).__init__()

        self.restored = False

        self.layer = nn.Sequential(
            nn.Linear(input_dims, hidden_dims),
            nn.ReLU(),
            nn.Linear(hidden_dims, hidden_dims),
            nn.ReLU(),
            nn.Linear(hidden_dims, output_dims),
            nn.LogSoftmax()
        )

    def forward(self, input):
        """Forward the discriminator."""
        out = self.layer(input)
        return out

In [15]:
import torch.nn as nn
import torch.optim as optim


encoder = LeNetEncoder()
classifier = LeNetClassifier()

"""Train classifier for source domain."""
####################
# 1. setup network #
####################

# set train state for Dropout and BN layers
encoder.train()
classifier.train()

# setup criterion and optimizer
optimizer = optim.Adam(
    list(encoder.parameters()) + list(classifier.parameters()),
    lr = 1e-4,
    betas=(0.5, 0.9))
criterion = nn.CrossEntropyLoss()

####################
# 2. train network #
####################

for epoch in range(500):
    for step, (images, labels) in enumerate(train_loader):
        # make images and labels variable
        images = (images).to(device)
        labels = (labels).to(device).long()

        # zero gradients for optimizer
        optimizer.zero_grad()

        # compute loss for critic
        preds = classifier(encoder(images))
        loss = criterion(preds, labels)

        # optimize source classifier
        loss.backward()
        optimizer.step()

        # print step info

        print("Epoch [{}/{}] Step [{}/{}]: loss={}"
              .format(epoch + 1,
                      500,
                      step + 1,
                      len(train_loader),
                      loss.data),end = '\r')

    # eval model on test set
    eval_src(encoder, classifier, val_loader)



def eval_src(encoder, classifier, loader):
    """Evaluate classifier for source domain."""
    # set eval state for Dropout and BN layers
    encoder.eval()
    classifier.eval()

    # init loss and accuracy
    loss = 0
    acc = 0

    # set loss function
    criterion = nn.CrossEntropyLoss()

    # evaluate network
    for (images, labels) in loader:
        images = (images).to(device)
        labels = (labels).to(device).long()

        preds = classifier(encoder(images))
        loss += criterion(preds, labels).data

        pred_cls = torch.max(preds, 1)[1]
        for idx ,ans in enumerate(pred_cls):
            if ans==labels[idx]:
                acc += 1

    loss /= len(loader)
    acc /= (len(loader.dataset)/10)

    print("Avg Loss = {}, Avg Accuracy = {:2%}".format(loss, acc))
    
    
train_src(encoder, classifier, train_loader,val_loader)

Epoch [1/500] Step [9/141]: loss=2.2763075828552246

KeyboardInterrupt: 

In [18]:
torch.save(encoder, './feature_exactor.pt')
torch.save(classifier, './classifier.pt')


  "type " + obj.__name__ + ". It won't be checked "
  "type " + obj.__name__ + ". It won't be checked "
