In [None]:
# 일반적인 DataSet 준비
trainframe = pd.read_csv("train_data.csv")
testframe = pd.read_csv("test_data.csv")

In [None]:
"""
 Meta Learner를 위한
 Meta-train / meta-test용 
 Dataset을 준비하는 Class
"""
class Task(object):
    
    def __init__(self
                     , all_classes
                     , num_classes
                     , num_instances):
        
    # 예시) task = Task(meta_train_classes, num_classes(5), num_instances(3))    
        
    self.all_classes = all_classes       # 준비된 전체 class [list]
    self.num_classes = num_classes       # N-way
    self.num_instances = num_instances   # K-shot
    self.train_roots = []                # meta_train_img_src_path
    self.meta_roots = []                 # meta_test_img_src_path
    self.train_labels = []               # meta_test_img_index(y-val)
    self.meta_labels = []                # meta_test_img_index(y-val)
    samples_per_class = 20               # 각 class별로 준비된 image 개수, 준비 된 image이니, 고정 값
    sampled_classes = random.sample(all_classes, num_classes)  # 해당 Task에서 선정된 label들 예를 들어 전체 class(label)의 subset
    # ex) all_classes가 list(:20)일때, sample은 [14,17,5] 이런식, 해당 sample class가 해당 task의 target class (train, test로 분기)
    label = 0
    for c in sampled_classes:
        cframe = trainframe.iloc[(c*samples_per_class):((c+1)*samples_per_class)] 
        # 이미지가 class 별로 순서 대로 들어있어서 위와같이 추출.
        cframe.reset_index(inplace=True, drop=True)
        paths = cframe["Path"] # img path
        # 특정 calss에 대해, meta-train, meta-test로 사용될 image index들 (여긴 20장으로 넉넉 하니깐..)
        sample_idxs = np.random.choice(samples_per_class, samples_per_class, replace=False)
        train_idxs = sample_idxs[:num_instances] # meta_train_img_index
        meta_idxs = sample_idxs[num_instances:(num_instances*2)] 
        # meta_test_img_index ~ 여기선 support 개수와 query개수를 같게 했네.
        # class는 고정되게, meta-train에 있던 class는 meta-test에도 있게 Restrict
        for idx in train_idxs:
            self.train_roots.append(paths[idx])
            self.train_labels.append(label)
        for idx in meta_idxs:
            self.meta_roots.append(paths[idx])
            self.meta_labels.append(label)
        label+=1

"""
 실제 Train을 위한 
 Train / Test용 
 Dataset을 준비하는 Class
"""
class TestTask(object):
    def __init__(self
                   , all_classes
                   , num_classes
                   , num_instances
                   , num_test_instances):
    self.all_classes = all_classes
    self.num_classes = num_classes
    self.num_instances = num_instances
    self.num_test_instances = num_test_instances # meta_test Data Point 개수
    self.test_roots = []
    self.train_roots = []
    self.test_labels = []
    self.train_labels = []
    samples_per_class = 20
    sampled_classes = random.sample(all_classes,num_classes)
    label = 0

    for c in sampled_classes:
        cframe = testframe.iloc[((c-964)*samples_per_class):(((c+1)-964)*samples_per_class)]
        cframe.reset_index(inplace=True, drop=True)
        paths = cframe["Path"]
        sample_idxs = np.random.choice(samples_per_class, samples_per_class, replace=False)
        train_idxs = sample_idxs[:num_instances]
        test_idxs = sample_idxs[num_instances:(num_instances + num_test_instances)]
        for idx in test_idxs:
            self.test_roots.append(paths[idx])
            self.test_labels.append(label)
        for idx in train_idxs:
            self.train_roots.append(paths[idx])
            self.train_labels.append(label)
        label+=1
        
"""
 Image DataLoading을 위한
 Util성 Data format용 Class
"""        
class MiniSet(Dataset):
    def __init__(self
                    , fileroots
                    , labels
                    , transform):
        self.fileroots = fileroots
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.fileroots)

    def __getitem__(self,idx):
        img = Image.open(self.fileroots[idx])
        img = self.transform(img)
        return img,self.labels[idx]

In [None]:

transform = transforms.Compose([  transforms.Resize((28,28))
                                , transforms.ToTensor()      ])

"""
 앞선, meta learner용 Task Class와
 Image load용 Class를 토대로 
 Meta Learner용 (inner loop)
 Dataset loader
"""
def get_loaders(task):
    loaders = {}
    train_fileroots = task.train_roots     # meta_train
    train_labels = task.train_labels       # meta_train
    meta_fileroots = task.meta_roots       # meta_test
    meta_labels = task.meta_labels         # meta_test
    
    trainloader = DataLoader(  MiniSet(train_fileroots,train_labels,transform)
                             , batch_size=len(train_fileroots)
                             , shuffle=True)
    metaloader = DataLoader(   MiniSet(meta_fileroots,meta_labels,transform)
                             , batch_size=len(meta_fileroots)
                             , shuffle=True)
    loaders["train"] = trainloader    # meta-train (support)
    loaders["meta"] = metaloader      # meta-test  (query)
    return loaders

def get_test_loaders(task):
    loaders = {}
    test_fileroots = task.test_roots
    test_labels = task.test_labels
    train_fileroots = task.train_roots
    train_labels = task.train_labels
    testloader = DataLoader(  MiniSet(test_fileroots,test_labels,transform)
                            , batch_size=len(test_fileroots)
                            , shuffle=True)
    trainloader = DataLoader(  MiniSet(train_fileroots,train_labels,transform)
                             , batch_size=len(train_fileroots)
                             , shuffle=True)
    loaders["train"] = trainloader   # origin-train
    loaders["test"] = testloader     # origin-test
    return loaders

In [None]:
class BaseNet(nn.Module):
    def __init__(self, num_classes):
        super(BaseNet,self).__init__()
        self.features = nn.Sequential(OrderedDict([
                ('conv1', nn.Conv2d(1, 64, 3)),
                ('bn1', nn.BatchNorm2d(64, momentum=1, affine=True)),
                ('relu1', nn.ReLU(inplace=True)),
                ('pool1', nn.MaxPool2d(2,2)),
                ('conv2', nn.Conv2d(64,64,3)),
                ('bn2', nn.BatchNorm2d(64, momentum=1, affine=True)),
                ('relu2', nn.ReLU(inplace=True)),
                ('pool2', nn.MaxPool2d(2,2)),
                ('conv3', nn.Conv2d(64,64,3)),
                ('bn3', nn.BatchNorm2d(64, momentum=1, affine=True)),
                ('relu3', nn.ReLU(inplace=True)),
                ('pool3', nn.MaxPool2d(2,2))]))    
        self.add_module('fc', nn.Linear(64,num_classes))

    def forward(self,x,weights=None):
        if weights == None:  
            output = self.features(x)
            output = output.view(-1, 64)
            output = self.fc(output)
        else:
            x = F.conv2d(x, weights['meta_learner.features.conv1.weight'], weights['meta_learner.features.conv1.bias'])
            x = F.batch_norm(x, weights['meta_learner.features.bn1.running_mean'], 
                                weights['meta_learner.features.bn1.running_var'],
                                weights['meta_learner.features.bn1.weight'],
                                weights['meta_learner.features.bn1.bias'],momentum=1,training=True)
        x = F.relu(x)
        x = F.max_pool2d(x, kernel_size=2, stride=2) 
        x = F.conv2d(x, weights['meta_learner.features.conv2.weight'], weights['meta_learner.features.conv2.bias'])
        x = F.batch_norm(x, weights['meta_learner.features.bn2.running_mean'], 
                            weights['meta_learner.features.bn2.running_var'],
                            weights['meta_learner.features.bn2.weight'],
                            weights['meta_learner.features.bn2.bias'],momentum=1,training=True)
        x = F.relu(x)
        x = F.max_pool2d(x, kernel_size=2, stride=2) 
        x = F.conv2d(x, weights['meta_learner.features.conv3.weight'], weights['meta_learner.features.conv3.bias'])
        x = F.batch_norm(x, weights['meta_learner.features.bn3.running_mean'], 
                            weights['meta_learner.features.bn3.running_var'],
                            weights['meta_learner.features.bn3.weight'],
                            weights['meta_learner.features.bn3.bias'],momentum=1,training=True)
        x = F.relu(x)
        x = F.max_pool2d(x, kernel_size=2, stride=2) 
        x = x.view(x.size(0), 64)
        output = F.linear(x, weights['meta_learner.fc.weight'], weights['meta_learner.fc.bias'])
        out = F.log_softmax(output, dim=1)
        return out


In [None]:

class MetaLearner(nn.Module):
    def __init__(self,num_classes):
        super(MetaLearner, self).__init__()
        self.meta_learner = BaseNet(num_classes)

    def forward(self, x, mod_weights=None):
        if mod_weights==None:
            out = self.meta_learner(x)
        else:
            out = self.meta_learner(x, mod_weights)
        return out
  
    def clone_state_dict(self):
        cloned_state_dict = {key: val.clone()for key, val in self.state_dict().items()}
        return cloned_state_dict

In [None]:

"""
    Meta Learner 학습
"""
def train_single_task(net, lr, loaders, num_updates, loss_metric):
    net.train()                             # dropout, batch norm등.
    trainloader = loaders["train"]          # set meta-train loader (support)
    x,y = trainloader.__iter__().next()     # image, label
    x.to(device)
    y.to(device)
    
    output = net(x)                          # y^ : predict
    loss = loss_metric(output, y)            # Negative log likelihood loss  
     
    # inner function. 우리는 net을 재사용 하니깐 이렇게 manually 초기화 해줘야 됨.
    def zero_grad(params):
        for p in params:
            if p.grad is not None:
                p.grad.zero_()
    
    # weight 초기화
    zero_grad(net.parameters())
    # 해당 step의 gradient 추출
    grads = torch.autograd.grad(loss, net.parameters(), create_graph=True)
    # 해당 step의 network 복사
    mod_state_dict = net.clone_state_dict()
    mod_weights = OrderedDict()
    
    for (k,v), g in zip(net.named_parameters(), grads):
        mod_weights[k] = v - lr*g # param update를 이렇게 manually .. --> 얘가 phi_i가 됨.
        mod_state_dict[k] = mod_weights[k]
    
    # 해당 meta learner 학습 loop를 반복 ~ 그러나 논문에선 num_updates가 1 이므로 해당 내용은 동작 x
    for i in range(1, num_updates):
        output = net(x, mod_state_dict)
        loss = loss_metric(output, y)
        zero_grad(mod_weights.values())
        grads = torch.autograd.grad(loss,mod_weights.values(),create_graph=True)
        for (k,v), g in zip(mod_weights.items(),grads):
            mod_weights[k] = v - lr*g 
            mod_state_dict[k] = mod_weights[k]
            
    # 해당 task로 조정된 network return
    return mod_state_dict

<h1>[Train handling]</h1>

In [None]:

def train( net                   # base net ~ torch.nn 
         , meta_train_classes    # [0,1,2,3,4...N] Encoding된 class(label) 
         , meta_optimiser        # Adam
         , loss_metric           # nn.NLLLoss()         
                                 #   : Negative log likelihood loss 
                                 #     -P(x)LogQ(x), p_dist : ground truth, Q_dist : predict
                                 #     CE에서 softmax가 따로 필요 없으니..
         , num_classes           # 5 way
         , num_instances         # 3 shot
         , num_tasks             # outer loop count ~ meta leaner의 looping 횟수 (10회)
         , lr                    # hyper param
         , meta_lr               # hyper param
         , num_inner_updates     # hyper param
                                 # : outer loop 반복 횟수, 논문에서 1번 ~ 적은게 더 gaussian에 근접하다.
         , num_epochs ):         # hyper param

    total_loss = 0  
    print_every = 100
    plot_every = 2
    meta_losses = []
    
    # meta train loop
    for epoch in range(1, num_epochs+1):
        
        # Meta learner용 dict들 초기화
        state_dicts = []
        loaders_list = []
        
        # 몇개의 task를 돌릴것이냐, outer loop ~ meta leaner의 looping 횟수
        for n in range(num_tasks):
            # 앞서 만든 Task class를 통해 task 도출 
            # Train class candidate에서 5 Way 3 Shot image path 추출
            task = Task(meta_train_classes, num_classes, num_instances)
            # 위의 task instance를 base로 pytorch loader 생성
            loaders = get_loaders(task)
            """
                Meta learning 부분,
                Task들을 순회하며, weight 도출
            """
            # meta learner 1 task 돌린 결과를 담아 놓음.
            d = train_single_task(net, lr, loaders, num_inner_updates, loss_metric) # --> phi_i
            # task별로 weight와 loader를 저장
            state_dicts.append(d)
            loaders_list.append(loaders)

        metaloss=0
        for n in range(num_tasks):
            loaders = loaders_list[n] # 앞서 task별로 수행했던 loader를 다시 추출 meta-test를 위해
            metaloader = loaders["meta"] # meta-test data
            x, y = metaloader.__iter__().next()
            x.to(device)
            y.to(device)
            """
                순회한 task별 loss 산출
            """
            d = state_dicts[n] # 해당 task index에서 수행된 결과 network
            output = net(x, d) # 여기서 net은 신규 network이고 각 task별 meta-learner의 결과 network이 initial weight로 들어감.
            loss = loss_metric(output,y) # 위의 initial weight base의 test loss 산출
            metaloss += loss # 각 task들의 loss를 sum
    
            
        # 전체 task의 평균 loss로 
        # 즉, 
        #    여러개의 task들이 (class combination) 고르게 반영된 방향으로
        #    weight가 없데이트 되게한다.
        #    여러 task들의 평균 loss로 (metaloss var에 담긴) weight update가 되기에 
        metaloss /= float(num_tasks) 
        meta_optimiser.zero_grad()
        total_loss += metaloss.item()
        metaloss.backward()
        meta_optimiser.step() # 실제 weight 갱신 !!

        if epoch % print_every == 0:
            print("{}/{}. loss: {}".format(epoch, num_epochs, total_loss / plot_every))
        if epoch%plot_every==0:
            meta_losses.append(total_loss/plot_every)
            total_loss = 0
        if (epoch%20)==0:
            print("Epoch "+str(epoch)+" completed.")
    return meta_losses, net

<h1> [Learning - main] </h1>

In [None]:
"""
 실제 train을 실행
"""
loss_metric = nn.NLLLoss()         # Negative log likelihood loss 
                                   # -P(x)LogQ(x), p_dist : ground truth, Q_dist : predict
                                   # CE에서 softmax가 따로 필요 없으니..
num_classes = 5                    # 5 way
net = MetaLearner(num_classes)
lr = 1e-1                          # hyper param
meta_lr = 1e-3                     # hyper param
meta_optimizer = torch.optim.Adam(net.parameters(), lr=meta_lr) # meta learning
num_instances = 3                  # 3 shot
num_tasks = 10                     # outer loop count ~ meta leaner의 looping 횟수
num_inner_updates = 1              # outer loop 반복 횟수, 논문에서 1번 ~ 적은게 더 gaussian에 근접하다.
num_epochs = 1000                  # hyper param
train_classes = np.max(trainframe['Label']) # 전체 class, Task별이 아닌 준비된 전체
train_classes = list(np.arange(train_classes))

metalosses, net = train( net
                       , train_classes
                       , meta_optimizer
                       , loss_metric
                       , num_classes
                       , num_instances
                       , num_tasks
                       , lr
                       , meta_lr
                       , num_inner_updates
                       , num_epochs)

In [None]:
def accuracy(outputs, labels):
    outputs = np.argmax(outputs, axis=1)
    return np.sum(outputs == labels) / float(labels.size)

In [None]:
def evaluate( net
            , test_classes
            , task_lr
            , num_classes = 5
            , num_steps = 100
            , num_eval_updates = 3):
    losses = []
    acc_list = []
    
    for step in np.arange(num_steps):
        task = TestTask(test_classes,num_classes=5,num_instances=3,num_test_instances=10)
        loaders = get_test_loaders(task)
        trainloader, testloader = loaders["train"], loaders["test"]
        x_train,y _train = trainloader.__iter__().next()
        x_test,y_test = testloader.__iter__().next()
        x_train.to(device)
        y_train.to(device)
        x_test.to(device)
        y_test.to(device)

    cloned_net = copy.deepcopy(net)
    optim = torch.optim.SGD(cloned_net.parameters(), lr=task_lr)
    
    for _ in range(num_eval_updates):
        y_train_pred = cloned_net(x_train)
        loss = loss_metric(y_train_pred,y_train)
        optim.zero_grad()
        loss.backward()
        optim.step()
    
    y_test_pred = cloned_net(x_test)
    loss = loss_metric(y_test_pred,y_test)
    losses.append(loss)
    y_test_pred = y_test_pred.data.cpu().numpy()
    y_test = y_test.data.cpu().numpy()
    acc = accuracy(y_test_pred,y_test)
    acc_list.append(acc)
    return acc_list,losses

In [None]:
import copy
#test_classes=np.max(testframe['Label'])-np.min(testframe['Label'])
test_classes = list(np.arange(np.min(testframe['Label']),np.max(testframe['Label']+1)))
acc_list,losses = evaluate(net, test_classes, task_lr=1e-1)