# Die Hard

There are 2 functions defined from input. One easy one and one hard one.

On training data easy and hard functions produce same result and on
test data you need to predict hard function.

# Setup

Copy auxiliary files from GitHub 

In [0]:
!rm gridsearch.py solutionmanager.py speedtest.py
!wget https://raw.githubusercontent.com/VVKot/mlinseconds-die-hard/master/mlis/utils/gridsearch.py -q
!wget https://raw.githubusercontent.com/VVKot/mlinseconds-die-hard/master/mlis/utils/solutionmanager.py -q
!wget https://raw.githubusercontent.com/VVKot/mlinseconds-die-hard/master/mlis/utils/speedtest.py -q

Import libraries and utils

In [0]:
!pip3 install tensorboard tensorboardX

In [0]:
import time
import random
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import solutionmanager as sm
from gridsearch import GridSearch

Check whether CUDA is available

In [0]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

Create neural network

In [0]:
class SolutionModel(nn.Module):
    def __init__(self, input_size, output_size, solution):
        super(SolutionModel, self).__init__()
        self.input_size = input_size
        sm.SolutionManager.print_hint("Hint[1]: NN usually learn easiest function, you need to learn hard one")
        self.hidden_size = 10
        self.linear1 = nn.Linear(input_size, self.hidden_size)
        self.linear2 = nn.Linear(self.hidden_size, output_size)

    def forward(self, x):
        x = self.linear1(x)
        x = torch.sigmoid(x)
        x = self.linear2(x)
        x = torch.sigmoid(x)
        return x

    def calc_loss(self, output, target):
        loss = ((output-target)**2).sum()
        return loss

    def calc_predict(self, output):
        predict = output.round()
        return predict

Create class to store hyper parameters. Implement grid search

In [0]:
class Solution():
    def __init__(self):
        self.best_step = 1000
        self.activations = {
            'sigmoid': nn.Sigmoid(),
            'relu': nn.ReLU(),
            'rrelu0103': nn.RReLU(0.1, 0.3),
            'elu': nn.ELU(),
            'selu': nn.SELU(),
            'leakyrelu01': nn.LeakyReLU(0.1)
        }
        self.learning_rate = 0.8
        self.momentum = 0.9
        self.hidden_size = 45
        self.layers_number = 5
        self.activation_hidden = 'relu'
        self.activation_output = 'sigmoid'
        self.do_batch_norm = True
        self.sols = {}
        self.solsSum = {}
        self.random = 0
        self.random_grid = [_ for _ in range(10)]
        self.layers_number_grid = [5, 6, 7, 8]
        self.hidden_size_grid = [20, 25, 28, 30, 32, 35, 38, 40, 45]
#         self.momentum_grid = [0.0, 0.3, 0.5, 0.8, 0.9]
        self.learning_rate_grid = [0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1, 1.2, 1.5]
        self.activation_hidden_grid = list(self.activations.keys())
#         self.activation_output_grid = list(self.activations.keys())
        self.grid_search = GridSearch(self)
        self.grid_search.set_enabled(False)

    def create_model(self, input_size, output_size):
        return SolutionModel(input_size, output_size, self)

    def get_key(self):
        return "{}_{}_{}_{}_{}_{}_{}".format(self.learning_rate, self.momentum, self.hidden_size, self.activation_hidden, self.activation_output, self.do_batch_norm, "{0:03d}".format(self.layers_number));

    # Return number of steps used
    def train_model(self, model, train_data, train_target, context):
        key = self.get_key()
        if key in self.sols and self.sols[key] == -1:
            return
        step = 0
        model.to(device)
        # Put model in train mode
        model.train()
        optimizer = optim.SGD(model.parameters(), lr=self.learning_rate, momentum=self.momentum)
        while True:
            time_left = context.get_timer().get_time_left()
            # No more time left, stop training
            if time_left < 0.1:
                break
            data = train_data
            target = train_target
            # model.parameters()...gradient set to zero
            optimizer.zero_grad()
            # evaluate model => model.forward(data)
            output = model(data)
            # if x < 0.5 predict 0 else predict 1
            predict = model.calc_predict(output)
            # Number of correct predictions
            correct = predict.eq(target.view_as(predict)).long().sum().item()
            # Total number of needed predictions
            total = predict.view(-1).size(0)
#             if correct == total or (self.grid_search.enabled and step > 1000):
#                 if not key in self.sols:
#                     loss = model.calc_loss(output, target)
#                     self.sols[key] = 0
#                     self.solsSum[key] = 0
#                     self.sols[key] += 1
#                     self.solsSum[key] += step
#                 if correct == total:
#                     self.print_stats(step, loss, correct, total, model)
#                     print('{:.4f}'.format(float(self.solsSum[key])/self.sols[key]))
#                 break
            # calculate loss
            loss = model.calc_loss(output, target)
            # calculate deriviative of model.forward() and put it in model.parameters()...gradient
            loss.backward()
            # print progress of the learning
            # update model: model.parameters() -= lr * gradient
            optimizer.step()
            step += 1
        return step
    
    def print_stats(self, step, loss, correct, total, model):
        print("LR={}, Momentum={}, HS={}, Number of layers={}, ActivOut={}, Step = {} Prediction = {}/{} Error = {}".format(model.solution.learning_rate, model.solution.momentum,
                                                                                                              model.hidden_size, model.layers_number, model.activation_hidden, step, correct, total, loss.item()))


Create class for data generation

In [0]:
class Limits:
    def __init__(self):
        self.time_limit = 2.0
        self.size_limit = 1000000
        self.test_limit = 0.75

class DataProvider:
    def __init__(self):
        self.number_of_cases = 20

    def full_func(self, input_size):
        while True:
            table = torch.ByteTensor(1<<input_size).random_(0, 2)
            vals = torch.ByteTensor(input_size, 2).zero_()
            depend_count = 0
            for i in range(input_size):
                for ind in range(1<<input_size):
                    if table[ind].item() != table[ind^(1<<i)].item():
                        depend_count += 1
                        break
            if depend_count == input_size:
                return table

    def tensor_to_int(self, tensor):
        tensor = tensor.view(-1)
        res = 0
        for x in tensor:
            res = (res<<1)+x.item()
        return res

    def int_to_tensor(self, ind, tensor):
        for i in range(tensor.size(0)):
            tensor[i] = (ind >> i)&1

    def create_data(self, seed, easy_table, hard_table, easy_input_size, hard_input_size, easy_correct):
        input_size = easy_input_size + hard_input_size
        data_size = 1 << input_size
        data = torch.ByteTensor(data_size, input_size)
        target = torch.ByteTensor(data_size, 1)
        count = 0
        for ind in range(data_size):
            self.int_to_tensor(ind, data[count])
            easy_ind = ind & ((1 << easy_input_size)-1)
            hard_ind = ind >> easy_input_size
            easy_value = easy_table[easy_ind].item()
            hard_value = hard_table[hard_ind].item()
            target[count, 0] = hard_value
            if not easy_correct or easy_value == hard_value:
                count += 1
        data = data[:count,:]
        target = target[:count,:]
        perm = torch.randperm(count)
        data = data[perm]
        target = target[perm]
        return (data.float().to(device), target.float().to(device))

    def create_case_data(self, case):
        easy_input_size = 2
        hard_input_size = 6

        random.seed(case)
        torch.manual_seed(case)
        easy_table = self.full_func(easy_input_size)
        hard_table = self.full_func(hard_input_size)
        train_data, train_target = self.create_data(case, easy_table, hard_table, easy_input_size, hard_input_size, True)
        test_data, test_target = self.create_data(case, easy_table, hard_table, easy_input_size, hard_input_size, False)
        perm = torch.randperm(train_data.size(1))
        train_data = train_data[:,perm]
        test_data = test_data[:,perm]
        return sm.CaseData(case, Limits(), (train_data, train_target), (test_data, test_target)).set_description("Easy {} inputs and hard {} inputs".format(easy_input_size, hard_input_size))
      
class Config:
    def __init__(self):
        self.max_samples = 10000

    def get_data_provider(self):
        return DataProvider()

    def get_solution(self):
        return Solution()

Evalute the model

In [0]:
sm.SolutionManager(Config()).run(case_number=-1)
