In [1]:
%load_ext autoreload
%autoreload 2
from typing import List

from copy import copy
from itertools import combinations, permutations
import pickle
from random import choice, random

import pandas as pd
import numpy as np

import sys
sys.path.append('../src')

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim

As an entry exercise, we will try to train the simple neural network able to sort the 1D array step by step. The inference of the model would be the most optimal permutation within given possible actions (we will use neighbors transpositions).   

## Define the target for the greedy algorithm
The 'sort loss' function we use 

$$
\text{SL} = \sum_{i=1}^{n-1} \sum_{j=i+1}^{n} [x_i > x_j]
$$

This way we get a convex function to make sure it's optimizable.
Below is the implementation:

In [3]:
def sort_loss(input: List[int]) -> float:
    loss = 0
    for i in range(len(input)):
        for j in range(i, len(input)):
            if input[i] > input[j]:
                loss += 1
    return loss


Example:

In [4]:
sort_loss([1,5,4,3,2])

6

The available actions are narrowed down to transpositions only:

In [5]:
def get_actions_possible(input: List[int]) -> List[List[int]]:
    res = []
    for i in range(len(input)-1):
        input_copy = copy(input)
        input_copy[i], input_copy[i+1] = input_copy[i+1], input_copy[i]
        res.append(input_copy)
    
    return res

The train data can be easily generated for the greedy algorithm and fixed sequence length as the most optimal action in terms of previously defined 'sort loss':

In [6]:
# generate simple data
SOURCE = [x for x in range(7)]
DATASET_SIZE = 500
SEQ_LEN = 4

DATA = []

SAMPLES = list(permutations(SOURCE, SEQ_LEN))
_picked = set()
for _ in range(DATASET_SIZE):
    v = choice(SAMPLES)
    if v not in _picked:
        _picked.add(v)

    swap_chosen = None
    for swap in get_actions_possible(list(v)):
        if swap_chosen is None:
            swap_chosen = swap
        if sort_loss(swap) < sort_loss(swap_chosen):
            swap_chosen = swap
    
    DATA.append((list(v), swap_chosen))

Simple sanity check

In [7]:
for i, row in enumerate(DATA):
    if i > 9:
        break
    print('------------------\n')
    print(row)
    print('------------------\n')
    for v in get_actions_possible(row[0]):
        print(v, sort_loss(v))
    print('-------------------\n')

------------------

([1, 3, 6, 5], [1, 3, 5, 6])
------------------

[3, 1, 6, 5] 2
[1, 6, 3, 5] 2
[1, 3, 5, 6] 0
-------------------

------------------

([2, 3, 4, 1], [2, 3, 1, 4])
------------------

[3, 2, 4, 1] 4
[2, 4, 3, 1] 4
[2, 3, 1, 4] 2
-------------------

------------------

([5, 3, 1, 2], [3, 5, 1, 2])
------------------

[3, 5, 1, 2] 4
[5, 1, 3, 2] 4
[5, 3, 2, 1] 6
-------------------

------------------

([4, 6, 0, 2], [4, 0, 6, 2])
------------------

[6, 4, 0, 2] 5
[4, 0, 6, 2] 3
[4, 6, 2, 0] 5
-------------------

------------------

([0, 4, 5, 3], [0, 4, 3, 5])
------------------

[4, 0, 5, 3] 3
[0, 5, 4, 3] 3
[0, 4, 3, 5] 1
-------------------

------------------

([5, 6, 3, 4], [5, 3, 6, 4])
------------------

[6, 5, 3, 4] 5
[5, 3, 6, 4] 3
[5, 6, 4, 3] 5
-------------------

------------------

([0, 3, 6, 2], [0, 3, 2, 6])
------------------

[3, 0, 6, 2] 3
[0, 6, 3, 2] 3
[0, 3, 2, 6] 1
-------------------

------------------

([1, 4, 5, 0], [1, 4, 0, 5])
------

### Split data

In [8]:
SPLIT_RATE = 0.9
train_data = []
test_data = []
for row in DATA:
    if random() < SPLIT_RATE:
        train_data.append(row)
    else:
        test_data.append(row)

In [9]:
len(train_data)

461

### Dump data

In [10]:
with open('../data/train.pkl', 'wb') as f:
    pickle.dump(train_data, f)

with open('../data/test.pkl', 'wb') as f:
    pickle.dump(test_data, f)

### Load data

In [11]:
with open('../data/train.pkl', 'rb') as f:
    train_data = pickle.load(f)

with open('../data/test.pkl', 'rb') as f:
    test_data = pickle.load(f)

### Define a simple sorter neural network

In [12]:
class SorterNN(nn.Module):
    
    def __init__(self, input_size, hidden_size, output_size):
        super(SorterNN, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x


In [13]:
input_train_data = torch.tensor([x[0] for x in train_data], dtype=torch.float32)
output_train_data = torch.tensor([x[1] for x in train_data], dtype=torch.float32)

input_test_data = torch.tensor([x[0] for x in test_data], dtype=torch.float32)
output_test_data = torch.tensor([x[1] for x in test_data], dtype=torch.float32)    

In [14]:
input_size = input_train_data.shape[1]
output_size = output_train_data.shape[1]
hidden_size = 64
model = SorterNN(input_size, hidden_size, output_size)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [15]:
num_epochs = 5000
for epoch in range(num_epochs):
    outputs = model(input_train_data)
    loss = criterion(outputs, output_train_data)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    if (epoch+1) % 1000 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

Epoch [1000/5000], Loss: 0.1667
Epoch [2000/5000], Loss: 0.0852
Epoch [3000/5000], Loss: 0.0471
Epoch [4000/5000], Loss: 0.0326
Epoch [5000/5000], Loss: 0.0281


In [16]:
test_data[0]

([1, 3, 6, 5], [1, 3, 5, 6])

In [17]:
test_input = torch.tensor([[5, 3, 1, 4]], dtype=torch.float32)
predicted_output = model(test_input).round()
print("Predicted Output:", predicted_output.detach())

Predicted Output: tensor([[3., 5., 1., 4.]])


In [18]:
for v in get_actions_possible([3, 5, 1, 4]):
    print(v, sort_loss(v))

[5, 3, 1, 4] 4
[3, 1, 5, 4] 2
[3, 5, 4, 1] 4


In [19]:
test_input = torch.tensor([[3, 5, 1, 4]], dtype=torch.float32)
predicted_output = model(test_input).round()
print("Predicted Output:", np.round(predicted_output.detach()))

Predicted Output: tensor([[3., 1., 5., 4.]])


Let's test the sorting

In [20]:
sample = test_data[0][0]
v = sort_loss(sample)

print(f'Input:\t {sample}')
i = 1
while v > 0:
    test_input = torch.tensor([sample], dtype=torch.float32)
    predicted_output = model(test_input).round()
    sample = predicted_output.detach().numpy().astype(int).tolist()[0]
    v = sort_loss(sample)
    print(f'Step {i}:\t', sample, f'\tSort loss: {v}')
    i += 1

Input:	 [1, 3, 6, 5]
Step 1:	 [1, 3, 5, 6] 	Sort loss: 0


So it looks like now we have a model that has trained the best movements for given permutations. 

Things to consider:
1. **what if the input consists of random unexpected values?**
   <br>_If the elements live in a metric space we assume them as sortable and thus, they can be indexed with known/expected intervals and sorted after that._ 
2. **what if the input is very large?**
   <br>_This can be handled by eating the elephant one bite at a time. We can break the input into acceptable parts and sort them independently. Once all parts are sorted we apply the merge-sort strategy. If the chunks are stil very big we can apply the same algorithm recursively. Yet the solution is designed for the fixed input only which is already a limitation._
3. **how to define sort loss and possible actions for a more sophisticated group?**