In [1]:
import random
import numpy as np
import torch
import learn2learn as l2l

from torch import nn, optim
import os
import pandas as pd
from skimage import io, transform
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
import warnings
warnings.filterwarnings("ignore")
import matplotlib.pyplot as plt
from torch.nn import functional as F

from sklearn.preprocessing import normalize


directory = "Path to NVIDIA dataset"

fm = directory + 'frame.csv'   


def convertt (  strs ):
    
    if strs == 'N':
        ret = 0 
    if strs == 'sine':
        ret = 1 
    if strs == 'gauss':
        ret = 2 
    if strs == 'poisson':
        ret = 3 
    if strs == 'speckle':
        ret = 4 
    if strs == 's-p':
        ret = 5 
    if strs == "add_white":
        ret = 6
    if strs == "add_black":
        ret =  7
    
    return ret

def normalize(x):
    return (x.astype(float) - 100)    # / 128



# generate custom pytorch Dataloader
class auto ( Dataset):
    
    def __init__ ( self, csv_file, transform = None):
        d = pd.read_csv( csv_file )
        d = d.iloc[1:,  : ]
        self.dist_type_arr = d.iloc[: , 1:3 ]
        self.transform = transform
        
    def __len__(self):
        return len( self.dist_type_arr  )
    
    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        
        img_name = self.dist_type_arr.iloc[ idx, 0 ]     #  0-> 1 ? 
        distortion = self.dist_type_arr.iloc[ idx,  1 ]
        image = io.imread(img_name)
        sample = {'image':image , 'distortion' : distortion }
        distortion = torch.tensor ( convertt ( distortion  )  )
        image = normalize ( image )
        image = torch.tensor (  image    )        
        image = torch.reshape(   image,  ( 3, 50, 50 )   )
        
        return image , distortion

    
    

In [None]:
class Net(nn.Module):
    def __init__(self, ways=3):
        
        super(Net, self).__init__()
        
        self.conv1 = nn.Conv2d(3, 20, 5, 1)
        self.conv2 = nn.Conv2d(20, 50, 5, 1)
        self.conv3 = nn.Conv2d(50, 50, 5, 1)
        
        
        self.fc1 = nn.Linear(  50 * 2 *2 , 100 )  # 4 4 50   50 * 117 * 157 
        self.fc2 = nn.Linear(  100  , 100 ) 
        self.fc4 = nn.Linear(  100  , 100 ) 
        self.fc3 = nn.Linear(100, ways)
        

    def forward(self, x):
        
        x = F.relu(self.conv1(  x.float() ))
        x = F.max_pool2d(x, 2, 2)
        
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        
        x = F.relu(self.conv3(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1 , 50 *2 *2 )   # 9-9-50
        
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        x = self.fc4(x)
        x = self.fc3(x)
        
        return F.log_softmax(x, dim=1)  
    


    
class MobileNet(nn.Module):
    def __init__(self, ch_in, n_classes):
        super(MobileNet, self).__init__()

        def bn(inp, oup, stride):
            return nn.Sequential(
                nn.Conv2d(inp, oup, 3, stride, 1, bias=False),
                nn.BatchNorm2d(oup),
                nn.ReLU(inplace=True)
                )

        def dw(inp, oup, stride):
            return nn.Sequential(
                # setting group size input channel = output channel, depthwise conv 
                nn.Conv2d(inp, inp, 3, stride, 1, groups=inp, bias=False),
                nn.BatchNorm2d(inp),
                nn.ReLU(inplace=True),

                # Setting group size = 1  i.e pointwise conv
                nn.Conv2d(inp, oup, 1, 1, 0, bias=False),
                nn.BatchNorm2d(oup),
                nn.ReLU(inplace=True),
                )

        self.model = nn.Sequential(
            bn(ch_in, 32, 2),
            dw(32, 64, 1),
            dw(64, 128, 2),
            dw(128, 128, 1),
            dw(128, 256, 2),
            dw(256, 256, 1),
            dw(256, 512, 2),
            dw(512, 512, 1),
            dw(512, 512, 1),
            dw(512, 512, 1),
            dw(512, 512, 1),
            dw(512, 512, 1),
            dw(512, 1024, 2),
            dw(1024, 1024, 1),
            nn.AdaptiveAvgPool2d(1)
        )
        self.fc = nn.Linear(1024, n_classes)

    def forward(self, x):
        x = self.model(x)
        x = x.view(-1, 1024)
        x = self.fc(x)
        return x
    
    
    
import gc



def fast_adapt(batch, learner, loss, adaptation_steps, shots, ways, device):
    data, labels = batch
    data, labels = data.to(device), labels.to(device)

    # Separate data into adaptation/evalutation sets
    adaptation_indices = np.zeros(data.size(0), dtype=bool)
    adaptation_indices[np.arange(shots*ways) * 2] = True
    evaluation_indices = torch.from_numpy(~adaptation_indices)
    adaptation_indices = torch.from_numpy(adaptation_indices)
    adaptation_data, adaptation_labels = data[adaptation_indices], labels[adaptation_indices]
    evaluation_data, evaluation_labels = data[evaluation_indices], labels[evaluation_indices]

    # Adapt the model
    for step in range(adaptation_steps):
        adaptation_error = loss(learner(adaptation_data), adaptation_labels)
        adaptation_error /= len(adaptation_data)
        learner.adapt(adaptation_error)

    # Evaluate the adapted model
    predictions = learner(evaluation_data)
    evaluation_error = loss(predictions, evaluation_labels)
    evaluation_error /= len(evaluation_data)
    evaluation_accuracy = accuracy(predictions, evaluation_labels)
    return evaluation_error, evaluation_accuracy









def accuracy(predictions, targets   , what = 'no'  ):
    predictions = predictions.argmax(dim=1)
    acc = (predictions == targets).sum().float()
    arr_what = targets [ predictions == targets ] 
    acc /= len(targets)
    
    if(what == 'what'):
        return acc.item() , arr_what, targets
    
    return acc.item() 


def main(lr=0.0005, maml_lr=0.0001, iterations=1000, ways= 3 , shots=3 , tps= 32 , fas=5, device=torch.device("cuda"),
         download_location='~/data'):
    
    transformations = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,)),
        lambda x: x.view(1, 28, 28),
    ])

    
    train_tasks = l2l.data.TaskDataset(auto_meta,
                                       task_transforms=[
                                            l2l.data.transforms.NWays(auto_meta, ways),
                                            l2l.data.transforms.KShots(auto_meta, 2*shots),
                                            l2l.data.transforms.LoadData(auto_meta),
                                            l2l.data.transforms.RemapLabels(auto_meta),
                                            l2l.data.transforms.ConsecutiveLabels(auto_meta),
                                       ],
                                       num_tasks=1000)
    
    
    model = Net(ways)
    model.to(device)
    
    meta_model = l2l.algorithms.MAML(model, lr=maml_lr)
    opt = optim.Adam(meta_model.parameters(), lr=lr)
    loss_func = nn.NLLLoss(reduction='mean')
    
    

    for iteration in range(iterations):
        iteration_error = 0.0
        iteration_acc = 0.0
        
        
        for _ in range(tps):
            learner = meta_model.clone()
            train_task = train_tasks.sample()
            data, labels = train_task
            data = data.to(device)
            labels = labels.to(device)
            
            # Separate data into adaptation/evalutation sets
            adaptation_indices = np.zeros(data.size(0), dtype=bool)
            adaptation_indices[np.arange(shots*ways) * 2] = True
            evaluation_indices = torch.from_numpy(~adaptation_indices)
            
            
            adaptation_indices = torch.from_numpy(adaptation_indices)
            adaptation_data, adaptation_labels = data[adaptation_indices], labels[adaptation_indices]
            evaluation_data, evaluation_labels = data[evaluation_indices], labels[evaluation_indices]

            # Fast Adaptation
            for step in range(fas):
                train_error = loss_func(learner(adaptation_data), adaptation_labels)
                learner.adapt(train_error)
                
            # Compute validation loss
            
            predictions = learner(evaluation_data)
            
            valid_error = loss_func(predictions, evaluation_labels)
            
            valid_error /= len(evaluation_data)
            valid_accuracy = accuracy(predictions, evaluation_labels)
            iteration_error += valid_error
            iteration_acc += valid_accuracy

        iteration_error /= tps
        iteration_acc /= tps
        print('Loss : {:.3f} Acc : {:.3f}'.format(iteration_error.item(), iteration_acc))

        opt.zero_grad()
        iteration_error.backward()
        opt.step()
        
        test_iteration_error_noupdate = 0.0
        test_iteration_acc_noupdate = 0.0
        
        precise_acc = np.zeros( 7 )
        
        for task in range( tps ):
            
            learner = meta_model.clone()
            train_task = train_tasks.sample()
            data, labels = train_task
            data = data.to(device)
            labels = labels.to(device)
            
            
            adaptation_indices = np.zeros(data.size(0), dtype=bool)
            adaptation_indices[np.arange(shots*ways) * 2] = True
            evaluation_indices = torch.from_numpy(~adaptation_indices)
            
            adaptation_indices = torch.from_numpy(adaptation_indices)
            
            test_adaptation_data, test_adaptation_labels = data[adaptation_indices], labels[adaptation_indices]
            test_evaluation_data, test_evaluation_labels = data[evaluation_indices], labels[evaluation_indices]
            
            
            for step in range(fas):
                meta_t = loss_func( learner ( test_adaptation_data ) , test_adaptation_labels )
                start = torch.cuda.Event(enable_timing=True)
                end = torch.cuda.Event(enable_timing=True)
                start.record()
                learner.adapt(meta_t)
                end.record()

                
            torch.cuda.synchronize()

            predictions = learner( test_evaluation_data )
            test_valid_error = loss_func(predictions, test_evaluation_labels)
            test_valid_error /= len( test_evaluation_data )
            test_iteration_error_noupdate += test_valid_error            
            test_valid_accuracy, what , real = accuracy(predictions, test_evaluation_labels , "what" )
            
            
            for re in range(  len(  what  )  ):
                precise_acc[what[re]] = precise_acc[what[re]] +1
                
            test_iteration_acc_noupdate += test_valid_accuracy
            
            
        test_iteration_error_noupdate /= tps
        test_iteration_acc_noupdate /= tps 
        precise_acc /= tps
        
        print('test_Loss : {:.3f} test_Acc : {:.3f}'.format(test_iteration_error_noupdate.item(), test_iteration_acc_noupdate))
                
            
            
            

        
main()
        

In [None]:

import torch

# 현재 Setup 되어있는 device 확인
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print ('Available devices ', torch.cuda.device_count())
print ('Current cuda device ', torch.cuda.current_device())
print(torch.cuda.get_device_name(device))


