In [None]:
import random
from math import asin, cos, exp, pi, sin, sqrt

import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F
import torch.optim as optim
from celluloid import Camera
from keras.utils import to_categorical
from matplotlib import pyplot as plt
from sklearn.datasets import load_iris
from sklearn.metrics import accuracy_score
from torch.autograd import Variable
from torch.optim.optimizer import Optimizer

# Task 1

## Step 1 Write Optimizer

In [None]:
class SimulatedAnnealing(Optimizer):
    def __init__(self,
                 params,
                 T=1.0,
                 anneal_rate=0.0003,
                 min_temp=1e-5,
                 anneal_every=100000):
        defaults = dict(T=T,
                        anneal_rate=anneal_rate,
                        min_temp=min_temp,
                        anneal_every=anneal_every,
                        iteration=0)
        super(SimulatedAnnealing, self).__init__(params, defaults)

    def step(self, closure=None):
        if closure is None:
            raise Exception("loss closure is required to do SA")

        loss = closure()

        for group in self.param_groups:
            # clone all of the params to keep in case we need to swap back
            cloned_params = [p.clone() for p in group['params']]

            for p in group['params']:
                # anneal temperature every 'anneal_every' iteration
                if group['iteration'] > 0 \
                   and group['iteration'] % group['anneal_every'] == 0:
                    group['T'] = np.maximum(group['T'] * group['anneal_rate'],
                                            group['min_temp'])

                # normalize weights
                p.data = p.data / torch.norm(p.data)

                # create randomizer
                rand_float = torch.FloatTensor

                # sample new weights with mu=old weights
                if len(list(p.data.shape)) == 2:
                    sampling_normal = []
                    for d in p.data:
                        temp = []
                        for i in d:
                            temp.append(rand_float(1).normal_(i, 2))
                        sampling_normal.append(temp)
                else:
                    sampling_normal = []
                    for d in p.data:
                        sampling_normal.append(rand_float(1).normal_(i, .5))

                p.data = torch.FloatTensor(sampling_normal)
                group['iteration'] += 1

            # re-evaluate the loss function with the perturbed params
            # if we didn't accept the new params swap back and return
            new_loss = closure()
            final_loss, is_accepted = self.anneal(loss, new_loss, group['T'])
            if not is_accepted:
                for p, pbkp in zip(group['params'], cloned_params):
                    p.data = pbkp.data

            return final_loss

    def anneal(self, loss, new_loss, temp):
        '''returns loss, is_new_loss'''
        def acceptance_prob(old, new, temp):
            return torch.exp((old - new) / temp)

        if new_loss.item() < loss.item():
            return new_loss, True
        else:
            ap = acceptance_prob(loss, new_loss, temp)
            u = np.random.rand()
            if ap.item() >= u:
                return new_loss, True

            return loss, False

## Step 2 Initialize parameters and load data

In [None]:
batch_size = 32
epochs = 1000

#loading iris data from sklearn
iris = load_iris()
x_data = iris.data
y_data = iris.target

#numpy to pytorch variable
x_data = Variable(torch.from_numpy(x_data))
y_data = Variable(torch.from_numpy(y_data))

## Step 3 Implement Model

In [None]:
class Model(torch.nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.l1 = torch.nn.Linear(4, 100)
        self.l2 = torch.nn.Linear(100, 100)
        self.l3 = torch.nn.Linear(100, 3)

        self.relu = torch.nn.ReLU()
        self.softmax = torch.nn.Softmax(dim=1)

    def forward(self, x):
        out1 = self.relu(self.l1(x))
        out2 = self.relu(self.l2(out1))
        out3 = self.l3(out2)
        y_pred = self.softmax(out3)
        return y_pred

## Step 4 Test Model with Default Optimizer - Stochastic Gradient Descent

In [None]:
model = Model()

criterion = torch.nn.CrossEntropyLoss(size_average=True)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
permutation = torch.randperm(x_data.size()[0])

for epoch in range(epochs):
    for i in range(0, x_data.size()[0], batch_size):
        indices = permutation[i:i + batch_size]
        batch_x, batch_y = x_data[indices], y_data[indices]

        optimizer.zero_grad()
        y_pred_val = model(batch_x.float())
        loss = criterion(y_pred_val, batch_y)
        loss.backward()
        optimizer.step()

        if epoch % 100 == 0:
            y_data_1 = batch_y.detach().numpy()
            y_pred_val = y_pred_val.detach().numpy()
            y_pred = [np.argmax(y) for y in y_pred_val]
            accuracy = accuracy_score(y_data_1, y_pred)
            print(
                f'Epoch: {str(epoch)}, batch: {str(i)}, loss: {loss}, accuracy: {accuracy}'
            )

y_pred = model(x_data.float())

y_pred = y_pred.detach().numpy()
y_data_1 = y_data.detach().numpy()

y_pred = [np.argmax(y) for y in y_pred]
print("accuracy: " + str(accuracy_score(y_data_1, y_pred)))

y_pred = model(x_data.float())
loss = loss_fun(y_pred, y_data)
loss

## Step 5 Test Model with Custom Optimizer

In [None]:
model = Model()

loss_fun = torch.nn.CrossEntropyLoss(size_average=True)

# Change parameters of optimizer
optimizer = SimulatedAnnealing(model.parameters(),
                               T=1.0,
                               anneal_rate=0.0001,
                               min_temp=1e-8,
                               anneal_every=1000)

permutation = torch.randperm(x_data.size()[0])

for epoch in range(1000):
    for i in range(0, x_data.size()[0], batch_size):

        # function for optimizer
        def closure():
            optimizer.zero_grad()
            output = model(batch_x.float())
            loss = loss_fun(output, batch_y)
            loss.backward()
            return loss

        indices = permutation[i:i + batch_size]
        batch_x, batch_y = x_data[indices], y_data[indices]
        y_pred_val = model(batch_x.float())
        loss = optimizer.step(closure)

        if epoch % 100 == 0:
            # calculates accuracy for epochs
            y_data_1 = batch_y.detach().numpy()
            y_pred_val = y_pred_val.detach().numpy()
            y_pred = [np.argmax(y) for y in y_pred_val]
            accuracy = accuracy_score(y_data_1, y_pred)
            print(
                f'Epoch: {str(epoch)}, batch: {str(i)}, loss: {loss}, accuracy: {accuracy}'
            )

y_pred = model(x_data.float())

y_pred = y_pred.detach().numpy()
y_data_1 = y_data.detach().numpy()

y_pred = [np.argmax(y) for y in y_pred]
print("accuracy: " + str(accuracy_score(y_data_1, y_pred)))

y_pred = model(x_data.float())
loss = loss_fun(y_pred, y_data)
loss

# Task 2

## Step 1 Prepare data for further algorithm

In [None]:
df = pd.read_csv('city.csv')

population = []
for num in df['population']:
    try:
        population.append(int(num))
    except ValueError:
        num = num[:-3]
        population.append(int(num))

df['population'] = population

df = df.sort_values(by=['population'], ascending=False, ignore_index=True)[:30]
df = df.drop([
    'postal_code', 'country', 'federal_district', 'region_type', 'region',
    'area_type', 'area', 'city_type', 'city', 'settlement_type', 'settlement',
    'kladr_id', 'fias_id', 'capital_marker', 'okato', 'oktmo', 'tax_office',
    'timezone', 'population', 'foundation_year', 'fias_level'
],
             axis=1)

df['address'] = [ad.split(' ')[-1] for ad in df['address']]
df = df.set_index('address')

## Step 2 Helping functions to calculate distance between two cities and in the whole route using Haversine formula

In [None]:
def distance(city1: pd.Series, city2: pd.Series):
    p = pi / 180

    lat1, lon1 = city1.geo_lat * p, city1.geo_lon * p
    lat2, lon2 = city2.geo_lat * p, city2.geo_lon * p

    dlat = lat2 - lat1
    dlon = lon2 - lon1

    r = 6371

    h = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
    return 2 * r * asin(sqrt(h))

def get_whole_distance(tour, df):
    tour_distance = 0

    for i in range(len(tour)):
        from_city = tour[i]

        if i + 1 < len(tour):
            destination_city = tour[i + 1]
        else:
            destination_city = tour[0]

        tour_distance += distance(df.loc[from_city],
                                  df.loc[destination_city])

    return tour_distance

## Step 3 Simulated Annealing algorithm

In [None]:
def simulated_annealing(df, anneal_every=100, anneal_rate=.01):
    # generates initial tour randomly
    def generate_random_individual(destination_cities):
        tour = []
        for city in destination_cities:
            tour.append(city)
        # re-order tour
        random.shuffle(tour)
        return tour

    # return acceptance probability
    def acceptance_probability(energy, new_energy, temperature):
        return exp(
            (energy - new_energy) / temperature) if energy <= new_energy else 1

    # initial params
    temp = 10000
    iteration = 0

    # helping list
    destination_cities = list(df.index)
 
    # generate x0
    tours = []
    tour = generate_random_individual(destination_cities)
    tours.append(tour)

    print('initial solution distance: ', get_whole_distance(tour, df))

    # will save best tour
    best_tour = tour.copy()
    while temp > 1:
        new_solution = tour.copy()

        tour_pos1 = int(len(new_solution) * np.random.rand())
        tour_pos2 = int(len(new_solution) * np.random.rand())

        city_swap1 = new_solution[tour_pos1]
        city_swap2 = new_solution[tour_pos2]

        new_solution[tour_pos2] = city_swap1
        new_solution[tour_pos1] = city_swap2

        current_energy = get_whole_distance(tour, df)
        new_energy = get_whole_distance(new_solution, df)

        if acceptance_probability(current_energy, new_energy,
                                  temp) >= np.random.rand():
            tour = new_solution.copy()

        if get_whole_distance(tour, df) < get_whole_distance(best_tour, df):
            best_tour = tour.copy()

        if iteration > 0 \
                   and iteration % anneal_every == 0:
            temp *= anneal_rate
            
        if iteration % 100 == 0:
            print('iteration №', iteration, '| temp = ', temp,
                  '| current distance: ', get_whole_distance(tour, df))
            
        tours.append(tour)

        iteration += 1

    print('final solution distance: ', get_whole_distance(best_tour, df))
    tours.append(best_tour)
    return best_tour, tours

In [None]:
tour, tours = simulated_annealing(df, anneal_every=1000, anneal_rate=.001)

## Step 4 Visualization

In [None]:
fig = plt.figure()
camera = Camera(fig)

for i in range(len(tours)):
    x = [df.loc[city]['geo_lon'] for city in tours[i]]
    y = [df.loc[city]['geo_lat'] for city in tours[i]]
    plt.plot(x, y, '-o', color='blue')
    for j, city in enumerate(tours[i]):
        plt.annotate(city, (x[j], y[j]))
    camera.snap()

animation = camera.animate()
animation.save('tour_0.1.gif', writer='imagemagick')

In [None]:
x = [df.loc[city]['geo_lon'] for city in tour]
y = [df.loc[city]['geo_lat'] for city in tour]
plt.plot(x, y, '-o', color='blue')
for j, city in enumerate(tour):
    plt.annotate(city, (x[j], y[j]))
    
plt.show()