In [1]:
from sklearn.linear_model import LinearRegression, SGDClassifier, LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import math
from tqdm import tqdm
import time
import os
import IPython
from PIL import Image
import numpy as np

In [2]:
def get_data_for_ML():
    """
    reshapes image and coordinate data; returns X and Y for ML
    """
    train_images, train_coords = list(), list()
    with open('coordinates.txt', 'r') as f:
        for line in tqdm(f):
            lat, lon, image_path = line.split(',')
            train_coords.append([float(lat), float(lon)])
            image_path = image_path[1:-1]
            image_array = np.array(Image.open(image_path))
            train_images.append(image_array)
    return np.asarray(train_images), np.asarray(train_coords)


def compare_results(actual_labels, predicted_labels):
    """
    text here
    """
    accuracy_list, points_list = list(), list()
    for item_ind in range(len(actual_labels)):
        lat_difference = abs(actual_labels[item_ind][0] - predicted_labels[item_ind][0])
        lon_difference = abs(actual_labels[item_ind][1] - predicted_labels[item_ind][1])
        total_difference_miles = lat_difference * 69 + lon_difference * 54.6 # 1 lat ~ 69 mi; 1 lon ~ 54.6 mi
        points = 5000 * (math.e ** (-1 * total_difference_miles / 4000)) # Russia's span is 4500 mi
        points_list.append(points)
        accuracy_list.append(points / 5000 * 100)
    return str(round(sum(accuracy_list) / len(accuracy_list))) + '%', sum(points_list) / len(points_list)


def show_error_distribution(y_test, y_pred):
    """
    text here
    """
    error_vals = list()
    for ind in range(len(y_test)):
        miles_apart = ((abs(y_test[ind][0] - y_pred[ind][0]) ** 2 + abs(y_test[ind][1] - y_pred[ind][1]) ** 2)) ** (1/2)
        error_vals.append(round(miles_apart))
    plt.xlabel('Distance Apart (mi)')
    plt.ylabel("Number of Guesses")
    plt.bar(error_vals, [error_vals.count(v) for v in error_vals])


def KNN_test(X_train, y_train, X_test, n_neighbors):
    """
    ~83%
    runtime: <1 hr based on n_neighbors
    """
    if len(X_train.shape) != 2 or len(X_test.shape) != 2:
        X_train = X_train.reshape(len(X_train), -1)
        X_test = X_test.reshape(len(X_test), -1)
    X_train = X_train.astype(int)
    y_train = y_train.astype(int)
    X_test = X_test.astype(int)
    neigh = KNeighborsClassifier(n_neighbors=n_neighbors)
    neigh.fit(X_train, y_train)
    return neigh.predict(X_test)


def display_KNN_graph(X_train, y_train, X_test):
    """
    text here
    """
    if len(X_train.shape) != 2 or len(X_test.shape) != 2:
        X_train = X_train.reshape(len(X_train), -1)
        X_test = X_test.reshape(len(X_test), -1)
    x, y = list(), list()
    for i in tqdm(range(5, len(X_train) // 10, 5)):
        y_pred = KNN_test(X_train, y_train, X_test, i)
        x.append(i)
        y.append(float(compare_results(y_test, y_pred)[0][:-1]))
    plt.xlabel('Number of Neighbors')
    plt.ylabel("Percentage Accuracy")
    plt.plot(x, y)


def linreg_test(X_train, y_train, X_test):
    """
    ~80%
    runtime: ~10m
    """
    if len(X_train.shape) != 2 or len(X_test.shape) != 2:
        X_train = X_train.reshape(len(X_train), -1)
        X_test = X_test.reshape(len(X_test), -1)
    linreg = LinearRegression()
    linreg.fit(X_train, y_train)
    return linreg.predict(X_test)


def logreg_test(X_train, y_train, X_test, penalty):
    """
    penalty l2:    82%
    penalty None:  78%
    Runtime:  45m
    """
    if len(X_train.shape) != 2 or len(X_test.shape) != 2:
        X_train = X_train.reshape(len(X_train), -1)
        X_test = X_test.reshape(len(X_test), -1)
    X_train = X_train.astype(int)
    y_train = y_train.astype(int)
    X_test = X_test.astype(int)
    lat_clf, lon_clf = SGDClassifier(penalty=penalty), SGDClassifier(penalty=penalty)
    lat_clf.fit(X_train, [c[0] for c in y_train])
    lon_clf.fit(X_train, [c[1] for c in y_train])
    lat_predicted, lon_predicted = lat_clf.predict(X_test), lon_clf.predict(X_test)
    return [[lat_predicted[i], lon_predicted[i]] for i in range(len(lat_predicted))]
    

def sanity_test():
    counter = 0
    with open('coordinates.txt', 'r') as f:
        for line in f:
            if line.split('screenshots/')[-1][:-1] not in os.listdir('screenshots'):
                print(line)
            counter += 1
    len_screenshots = len(list(os.listdir('screenshots'))) - 1
    print(counter == len_screenshots)
    print(counter, '==', len_screenshots)


In [3]:
train_images, train_labels = get_data_for_ML()
X_train, X_test, y_train, y_test = train_test_split(train_images, train_labels)

990it [00:18, 54.90it/s]


In [40]:
def function_a(y_train, y_test, y_pred):
    for ind in range(len(y_test)):
        plt.scatter([a[1] for a in y_train], [a[0] for a in y_train])
        plt.scatter([38], [56], label='Moscow', color='black')
        plt.scatter([30], [60], label='St. Petersberg', color='black')
        plt.scatter([44], [48], label='Volgograd', color='black')
        plt.scatter([131], [43], label='Vladivostok', color='black')
        plt.scatter([y_test[ind][1]], [y_test[ind][0]], color='lightgreen') # ACTUAL VALUE
        plt.scatter([y_pred[ind][1]], [y_pred[ind][0]], color='orange') # PREDICTED VALUE
        plt.legend()
        plt.show()
        time.sleep(1)
        IPython.display.clear_output()

In [None]:
function_a(y_train, y_test, y_pred)

In [None]:
display_KNN_graph(X_train, y_train, X_test)

In [None]:
y_pred = KNN_test(X_train, y_train, X_test, 40)
print(compare_results(y_test, y_pred))
show_error_distribution(y_test, y_pred)

In [None]:
y_pred = linreg_test(X_train, y_train, X_test)
print(compare_results(y_test, y_pred))
show_error_distribution(y_test, y_pred)

In [None]:
y_pred = logreg_test(X_train, y_train, X_test, 'l2')
print(compare_results(y_test, y_pred))
show_error_distribution(y_test, y_pred)

In [4]:
import torch

class Net(torch.nn.Module):
    def __init__(self, output_layer_count):
        super(Net, self).__init__()
        self.model = torch.nn.Sequential(
            torch.nn.Linear(596 * 1036 * 4, 512),
            torch.nn.ReLU(),
            torch.nn.Linear(512, 256),
            torch.nn.ReLU(),
            torch.nn.Linear(256, output_layer_count))

    def forward(self, x):
        x = x.view(-1, 596 * 1036 * 4)
        return self.model(x)


def train_neutral_network(net, train_dl: torch.utils.data.DataLoader, test_dl: torch.utils.data.DataLoader, lr: float, n_epochs: int):
    """
    text here
    """
    Loss = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(net.parameters(), lr=lr, momentum=1)
    # optimizer = torch.optim.Adam()
    # optimizer = torch.optim.SGD(net.parameters(), lr=lr, momentum=1, nesterov=True)
    train_loss_history, test_loss_history = list(), list()
    for epoch in range(n_epochs):
        train_loss = 0.0
        test_loss = 0.0
        for i, data in enumerate(train_dl):
            images, labels = data
            optimizer.zero_grad()
            predicted_output = net(images)
            fit = Loss(predicted_output, labels)
            fit.backward()
            optimizer.step()
            train_loss += fit.item()
        for i, data in enumerate(test_dl):
            with torch.no_grad():
                images, labels = data
                predicted_output = net(images)
                fit = Loss(predicted_output, labels)
                test_loss += fit.item()
        train_loss = train_loss / len(train_dl)
        test_loss = test_loss / len(test_dl)
        train_loss_history.append(train_loss)
        test_loss_history.append(test_loss)
        print('Epoch {}, \tTrain loss {}, \tTest loss {}'.format(epoch, round(train_loss, 2), round(test_loss, 2)))
    return net, train_loss_history, test_loss_history
    

def evaluate(net: Net, dataloader: torch.utils.data.DataLoader):
    total, correct = 0, 0
    net.eval()
    for data in dataloader:
        images, labels = data
        predicted_output = net(images)
        _, predicted_labels = torch.max(predicted_output, 1)
        total += labels.size(0)
        correct += (predicted_labels == labels).sum().item()
    return 100 * correct/total


def evaluate2(net: Net, dataloader: torch.utils.data.DataLoader):
    net.eval()
    d_list = list()
    for data in dataloader:
        images, labels = data
        predicted_output = net(images)
        _, predicted_labels = torch.max(predicted_output, 1)
        differences = [abs(predicted_labels[i] - labels[i]) for i in range(len(predicted_labels))]
        d_list.append(sum(differences) / len(differences))
    return sum(d_list) / len(d_list)


def show_loss_graph(n_epochs, train_loss_history, test_loss_history):
    plt.plot(np.arange(n_epochs), train_loss_history, '-', linewidth=3, label='Train error')
    plt.plot(np.arange(n_epochs), test_loss_history, '-', linewidth=3, label='Test error')
    plt.xlabel('epoch')
    plt.ylabel('loss')
    plt.grid(True)
    plt.legend()
    

def make_minibatch_y(net_lat, net_lon, dataloader_lat, dataloader_lon):
    net_lat.eval()
    net_lon.eval()
    lat_pred, lon_pred = list(), list()
    lat_actual, lon_actual = list(), list()
    for data in dataloader_lat:
        images, labels = data
        predicted_output = net_lat(images)
        _, predicted_labels = torch.max(predicted_output, 1)
        lat_pred += list(predicted_labels)
        lat_actual += labels
    for data in dataloader_lon:
        images, labels = data
        predicted_output = net_lon(images)
        _, predicted_labels = torch.max(predicted_output, 1)
        lon_pred += list(predicted_labels)
        lon_actual += labels
    y_test = np.asarray([[lat_actual[i], lon_actual[i]] for i in range(len(lat_actual))])
    y_pred = np.asarray([[lat_pred[i], lon_pred[i]] for i in range(len(lat_pred))])
    return y_test, y_pred

In [5]:
train_ds_lat = torch.utils.data.TensorDataset(torch.FloatTensor(X_train), torch.LongTensor([v[0] for v in y_train]))
train_dl_lat = torch.utils.data.DataLoader(train_ds_lat, batch_size=64, shuffle=True)
train_ds_lon = torch.utils.data.TensorDataset(torch.FloatTensor(X_train), torch.LongTensor([v[1] for v in y_train]))
train_dl_lon = torch.utils.data.DataLoader(train_ds_lon, batch_size=64, shuffle=True)
test_ds_lat = torch.utils.data.TensorDataset(torch.FloatTensor(X_test), torch.LongTensor([v[0] for v in y_test]))
test_dl_lat = torch.utils.data.DataLoader(test_ds_lat, batch_size=64, shuffle=True)
test_ds_lon = torch.utils.data.TensorDataset(torch.FloatTensor(X_test), torch.LongTensor([v[1] for v in y_test]))
test_dl_lon = torch.utils.data.DataLoader(test_ds_lon, batch_size=64, shuffle=True)

In [None]:
lr = 0.00000001
n_epochs = 10

net_lat, train_loss_history_lat, test_loss_history_lat = train_neutral_network(Net(64), train_dl_lat, test_dl_lat, lr, n_epochs)
net_lon, train_loss_history_lon, test_loss_history_lon = train_neutral_network(Net(180), train_dl_lon, test_dl_lon, lr, n_epochs)

Epoch 0, 	Train loss 20.53, 	Test loss 13.61
Epoch 1, 	Train loss 10.3, 	Test loss 11.0
Epoch 2, 	Train loss 9.27, 	Test loss 10.48
Epoch 3, 	Train loss 8.62, 	Test loss 9.93
Epoch 4, 	Train loss 8.48, 	Test loss 10.81
Epoch 5, 	Train loss 8.48, 	Test loss 7.5
Epoch 6, 	Train loss 6.37, 	Test loss 8.62
Epoch 7, 	Train loss 6.29, 	Test loss 6.99
Epoch 8, 	Train loss 6.38, 	Test loss 8.32
Epoch 9, 	Train loss 6.55, 	Test loss 9.34
Epoch 0, 	Train loss 28.35, 	Test loss 18.3
Epoch 1, 	Train loss 16.58, 	Test loss 18.48
Epoch 2, 	Train loss 16.3, 	Test loss 17.21
Epoch 3, 	Train loss 15.76, 	Test loss 17.1
Epoch 4, 	Train loss 15.56, 	Test loss 17.65
Epoch 5, 	Train loss 14.78, 	Test loss 17.96


In [None]:
print('Train acc = %0.2f, test acc = %0.2f' % (evaluate(net_lat, train_dl_lat), evaluate(net_lat, test_dl_lat)))
print('Train dist error = %0.2f, test dist error = %0.2f' % (evaluate2(net_lat, train_dl_lat), evaluate2(net_lat, test_dl_lat)))

print('Train acc = %0.2f, test acc = %0.2f' % (evaluate(net_lon, train_dl_lon), evaluate(net_lon, test_dl_lon)))
print('Train dist error = %0.2f, test dist error = %0.2f' % (evaluate2(net_lon, train_dl_lon), evaluate2(net_lon, test_dl_lon)))

y_t, y_p = make_minibatch_y(net_lat, net_lon, test_dl_lat, test_dl_lon)
print(compare_results(y_t, y_p))
show_error_distribution(y_t, y_p)

In [None]:
show_loss_graph(n_epochs, train_loss_history_lat, test_loss_history_lat)

In [None]:
show_loss_graph(n_epochs, train_loss_history_lon, test_loss_history_lon)

In [None]:
"""


77%
79%
78%
83%
74%
79%



82%
77%
81%
70%
76%
75%


"""