In [319]:
import folium
import pandas as pd
import numpy as np
from sympy import *
import matplotlib.pyplot as plt

DAEGU = [35.85, 128.56]
KADIZ = [(39.00, 123.30),
         (37.00, 124.00),
         (30.00, 124.00),
         (30.00, 124.00),
         (30.00, 125.25),
         (30.00, 125.25),
         (32.30, 126.50),
         (32.30, 127.30),
         (34.17, 128.52),
         (35.13, 129.48),
         (36.00, 130.30),
         (37.17, 133.00),
         (39.00, 133.00),
         (39.00, 123.30)]
IEODO = [32.11753829417425, 125.16666665729801]
DOKDO = [37.24315823449135, 131.86690646281266]

def init_map():
    m = folium.Map(location = DAEGU, zoom_start = 5.5)
    folium.PolyLine(locations = KADIZ,tooltip = 'KADIZ', color='red').add_to(m)
    folium.Marker(IEODO, popup="<i>IEODO</i>").add_to(m)
    folium.Marker(DOKDO, popup="<i>DOKDO</i>").add_to(m)
    return m

m = init_map()

m

In [320]:
def mk_row(point):
    x, y = point
    return [y * y, x, y, x * y, 1]

#return value (a, b, c, d, e, f) matches (ax^2 + by^2 + cx + by + exy + f = 0)
def find_ellipse(points):
    if len(points) != 5:
        raise ValueError
    else:
        mat = []
        constant = []
        for point in points:
            mat.append(mk_row(point))
            constant.append(-point[0]**2)
        sol = np.linalg.solve(np.array(mat), constant)
    return np.concatenate((np.array([1]), sol), axis = None)

def make_route(sol, data, input_axis, select):
    x = symbols('x', real = True)
    y = symbols('y', real = True)
    a, b, c, d, e, f = sol
    route = []
    if input_axis == 'x':
        for i in data:
            x = i
            ans = solve(a*x**2 + b*y**2 + c*x + d*y + e*x*y + f)
            if len(ans) == 0:
                print('No real solution at {}'.format(i))
                break
            if select == 'max':
                y_sol = max(ans)
                route.append([y_sol, i])
            elif select == 'min':
                y_sol = min(ans)
                route.append([y_sol, i])
    elif input_axis == 'y':
        for i in data:
            y = i
            ans = solve(a*x**2 + b*y**2 + c*x + d*y + e*x*y + f)
            if len(ans) == 0:
                print('No real solution at {}'.format(i))
                break
            if select == 'max':
                x_sol = max(ans)
                route.append([i, x_sol])
            elif select == 'min':
                x_sol = min(ans)
                route.append([i, x_sol])
    return route

In [321]:
# Make base route
points = [(120.15, 29.331), (125.8, 31.369), (127.42, 32.459), (129.44, 34.436), (131.35, 38.448)]

sol = find_ellipse(points)

lat_range = np.linspace(123, 131, 100)
base_route = np.array(make_route(sol, lat_range, 'x', 'min'))

# print(len(route_1), route_1)

In [322]:
folium.PolyLine(locations = base_route, tooltip = 'flight tracks', color = 'blue').add_to(m)
m

In [323]:
# make noise
mu, sigma = 0.1, 0.01
route_dataset = []

for i in range(100):
    s = np.random.default_rng(i).normal(mu, sigma, size=(100, 2))
    route_dataset.append(base_route + s)

route_dataset = np.array(route_dataset)
print(route_dataset.shape)
# print(route_dataset[0])

(100, 100, 2)


In [324]:
# show one route in dataset 
folium.PolyLine(locations = route_dataset[0], tooltip = 'track with noise', color = 'orange').add_to(m)
m

In [325]:
# Split datasets
ratio = 0.7
boundary = int(len(route_dataset) * ratio)

train_dataset = route_dataset[:boundary, :, :]
test_dataset = route_dataset[boundary:, :, :]

print(train_dataset.shape, test_dataset.shape)

(70, 100, 2) (30, 100, 2)


In [326]:
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import MinMaxScaler, StandardScaler

standard = StandardScaler()
minmax = MinMaxScaler()

class RouteDataset(Dataset): 
    def __init__(self, train=True):    
        # Scaling
        self.dataset = train_dataset if train == True else test_dataset
        self.boundary = int(len(self.dataset[0]) / 2)
        
        self.x_data = self.dataset[:, :self.boundary, :]
        self.y_data = self.dataset[:, self.boundary:, :]
        
        self.x_data = standard.fit_transform(self.x_data.reshape(-1, self.x_data.shape[-1])).reshape(self.x_data.shape)
        self.y_data = minmax.fit_transform(self.y_data.reshape(-1, self.y_data.shape[-1])).reshape(self.y_data.shape)
            
    def __len__(self): 
        return len(self.x_data)
    
    def __getitem__(self, idx): 
        x = torch.FloatTensor(self.x_data[idx])
        y = torch.FloatTensor(self.y_data[idx])
        return x, y

In [327]:
batch_size = 16

train_loader = DataLoader(dataset=RouteDataset(train=True), 
                          batch_size=batch_size, 
                          shuffle=True)
test_loader = DataLoader(dataset=RouteDataset(train=False), 
                         batch_size=batch_size, 
                         shuffle=True)

for i, (x, y) in enumerate(train_loader):
    print(x.shape, y.shape)

torch.Size([16, 50, 2]) torch.Size([16, 50, 2])
torch.Size([16, 50, 2]) torch.Size([16, 50, 2])
torch.Size([16, 50, 2]) torch.Size([16, 50, 2])
torch.Size([16, 50, 2]) torch.Size([16, 50, 2])
torch.Size([6, 50, 2]) torch.Size([6, 50, 2])


In [328]:
# Model definition
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable

device = 'cpu'

class LSTM(nn.Module):
    def __init__(self,output_size,input_size,hidden_size,num_layers,seq_length):
        super(LSTM,self).__init__()
        self.output_size = output_size
        self.num_layers = num_layers
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.seq_length = seq_length

        self.lstm = nn.LSTM(input_size=input_size , hidden_size=hidden_size , num_layers=num_layers, batch_first=True)
        self.fc_1 = nn.Linear(hidden_size,128)
        self.fc = nn.Linear(128,output_size)

        self.relu = nn.ReLU()

    def forward(self,x):
        h_0 = Variable(torch.zeros(self.num_layers, x.size(0),self.hidden_size)).to(device)
        c_0 = Variable(torch.zeros(self.num_layers, x.size(0),self.hidden_size)).to(device)

        output, (hn,cn) = self.lstm(x,(h_0,c_0))

        hn = hn.view(-1, self.hidden_size)
        out = self.relu(hn)
        out = self.fc_1(out)
        out = self.relu(out)
        out = self.fc(out)
        return out

In [329]:
num_epochs = 70
learning_rate = 0.0001

input_size = 2
hidden_size = 32
num_layers = 1
output_size = 100
seq_length = 50

print_every = 10

model = LSTM(output_size, input_size, hidden_size, num_layers, seq_length)
loss_function = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate)

In [330]:
# Model trainig
for epoch in range(1, num_epochs+1):
    for i, (x_train, y_train) in enumerate(train_loader):
        x_train = Variable(x_train.view(-1, seq_length, input_size))
        y_train = Variable(y_train.view(-1, seq_length*input_size))

        optimizer.zero_grad()
        outputs = model(x_train)
    #         import pdb; pdb.set_trace()
        loss = loss_function(outputs, y_train)

        loss.backward()
        optimizer.step()

    if epoch % print_every == 0:
        print("[Epoch %3d] loss : %1.5f" % (epoch, loss.item()))

[Epoch  10] loss : 0.26138
[Epoch  20] loss : 0.20800
[Epoch  30] loss : 0.11660
[Epoch  40] loss : 0.02479
[Epoch  50] loss : 0.00479
[Epoch  60] loss : 0.00096
[Epoch  70] loss : 0.00020


In [331]:
# Model testing
total, total_loss = 0, 0.0
model.eval()

for i, (x_test, y_test) in enumerate(test_loader):
    x_test = Variable(x_test.view(-1, seq_length, input_size))
    y_test = Variable(y_test.view(-1, seq_length*input_size))

    outputs = model(x_test)
    loss = loss_function(outputs, y_test)

    total += y_test.size(0)
    total_loss += loss.item()
        
avg_loss = total_loss / total
print('Test Loss : %1.5f' % avg_loss)

Test Loss : 0.00001


In [337]:
# Visualization
for (x, y) in test_loader:
    x = Variable(x[0].view(-1, seq_length, input_size))
    y = Variable(y[0].view(-1, seq_length*input_size))
    y_predicted = model(x)
    
    x = x.view(seq_length, input_size).data.detach().cpu().numpy()
    y = y.view(seq_length, input_size).data.detach().cpu().numpy()
    y_predicted = y_predicted.view(seq_length, input_size).data.detach().cpu().numpy()    
        
    x = standard.inverse_transform(x)
    y = minmax.inverse_transform(y)
    y_predicted = minmax.inverse_transform(y_predicted)
    
    m = init_map()
    
    folium.PolyLine(locations = x,color = 'blue',tooltip = 'Real Trajectory').add_to(m)
    folium.PolyLine(locations = y,color = 'green',tooltip = 'Predicted Trajectory').add_to(m)
    folium.PolyLine(locations = y_predicted,color = 'orange',tooltip = 'Predicted Trajectory').add_to(m)
    
    break
m