In [None]:
import pandas as pd
import dask.dataframe as dd

import bamboolib as bam
import numpy as np
import math

import matplotlib
import matplotlib.pyplot as plt

import torch
from torch import nn
from torch.utils.data import TensorDataset, DataLoader
import torch.optim as optim
from torch.optim import lr_scheduler

print("Done!")

## Segédfüggvények

In [None]:
def pdata_to_tdata_split(parquet_data,train_percent,val_percent):

  if train_percent + val_percent > 100:
    print("Sum of Train and Val must be less than 100\n")
    return
    
  d_parquet_len = len(parquet_data)

  first_key = parquet_data.index.start
  str_date = str(parquet_data.tpep_pickup_datetime[first_key])
  int_date = int(str_date[0:4])

  train_size = int(train_percent/100*d_parquet_len)
  val_size = int(val_percent/100*d_parquet_len)

  TrainX = torch.rand(train_size,7)
  TrainY = torch.rand(train_size,2)

  ValX = torch.rand(val_size,7)
  ValY = torch.rand(val_size,2)

  if int_date == 2019:
    valy = torch.tensor([1,0],dtype=torch.float)
  if int_date == 2020:
    valy = torch.tensor([0,1],dtype=torch.float)

  for i in range(train_size):
    TrainX[i,0:7] = torch.tensor([parquet_data.trip_distance[i],parquet_data.passenger_count[i],parquet_data.payment_type[i] + 1,
                        parquet_data.fare_amount[i],parquet_data.extra[i],parquet_data.tip_amount[i],parquet_data.improvement_surcharge[i]],dtype=torch.float)

    TrainY[i,0:2] = valy

  for i in range(val_size):
    ValX[i,0:7] = torch.tensor([parquet_data.trip_distance[i+train_size],parquet_data.passenger_count[i+train_size],parquet_data.payment_type[i+train_size] + 1,
                        parquet_data.fare_amount[i+train_size],parquet_data.extra[i+train_size],parquet_data.tip_amount[i+train_size],parquet_data.improvement_surcharge[i+train_size]],dtype=torch.float)
  
    ValY[i,0:2] = valy

  return TrainX,TrainY,  ValX,ValY


def make_dataloaders(l_parquet_datas,batch_size,train_percent=80,val_percent=20):

  whole_train_size = 0
  whole_val_size = 0

  for data in l_parquet_datas:
    d_parquet_len = len(data)
    d_train_size = int(train_percent/100*d_parquet_len)
    d_val_size = int(val_percent/100*d_parquet_len)

    whole_train_size += d_train_size
    whole_val_size += d_val_size
  
  TrainX = torch.rand(whole_train_size,7)
  TrainY = torch.rand(whole_train_size,2)

  ValX = torch.rand(whole_val_size,7)
  ValY = torch.rand(whole_val_size,2)

  i = 0
  j = 0

  for data in l_parquet_datas:

    d_parquet_len = len(data)
    d_train_size = int(train_percent/100*d_parquet_len)
    d_val_size = int(val_percent/100*d_parquet_len)

    TrainX1,TrainY1,ValX1,ValY1 = pdata_to_tdata_split(data,train_percent,val_percent)

    TrainX[i:i+d_train_size,0:7] = TrainX1
    TrainY[i:i+d_train_size,0:2] = TrainY1

    ValX[j:j+d_val_size,0:7] = ValX1
    ValY[j:j+d_val_size,0:2] = ValY1

    i += d_train_size
    j += d_val_size

  train_dataset = TensorDataset(TrainX,TrainY)
  train_dataloader = DataLoader(train_dataset,batch_size=batch_size,shuffle=True)

  val_dataset = TensorDataset(ValX,ValY)
  val_dataloader = DataLoader(val_dataset,batch_size=1,shuffle=True)

  print("Done!")
  
  return train_dataloader,val_dataloader


def make_val_dataloader(l_parquet_datas):
  whole_test_size = 0

  for parquet_data in l_parquet_datas:
    size = len(parquet_data)
    whole_test_size += size
  
  TestX = torch.rand(whole_test_size,7)
  TestY = torch.rand(whole_test_size,2)

  k = 0
  for parquet_data in l_parquet_datas:
    size = len(parquet_data)

    first_key = parquet_data.index.start
    str_date = str(parquet_data.tpep_pickup_datetime[first_key])
    int_date = int(str_date[0:4])

    if int_date == 2019:
      testy = torch.tensor([1,0],dtype=torch.float)
    if int_date == 2020:
      testy = torch.tensor([0,1],dtype=torch.float)

    TestX1 = torch.rand(size,7)
    TestY1 = torch.rand(size,2)

    j = first_key
    for i in range(size):
      TestX1[i,0:7] = torch.tensor([parquet_data.trip_distance[i+j],parquet_data.passenger_count[i+j],parquet_data.payment_type[i+j] + 1,
                        parquet_data.fare_amount[i+j],parquet_data.extra[i+j],parquet_data.tip_amount[i+j],parquet_data.improvement_surcharge[i+j]],dtype=torch.float)

      TestY1[i,0:2] = testy
    

    TestX[k:k+size,0:7] = TestX1
    TestY[k:k+size,0:2] = TestY1

    k += size

  test_dataset = TensorDataset(TestX,TestY)
  test_dataloader = DataLoader(test_dataset,batch_size=1,shuffle=True)

  print("Done!")
  
  return test_dataloader

print("Done!")

In [None]:
def train_loop(model,loss_fn,dataloader,optimizer,scheduler):

  size = len(dataloader.dataset)

  correct = 0
  running_loss = 0
  bestLoss = 1000

  for X,y in dataloader:
      optimizer.zero_grad()

      y_pred = model(X)
      correct += (y_pred.argmax() == y.argmax()).type(torch.float).sum().item()
      loss = loss_fn(y_pred,y)

      loss.backward()
      optimizer.step()

      running_loss += loss.item()
  
  scheduler.step()

  running_loss /= len(dataloader)
  correct /= size

  print(f'Loss: {running_loss}')
  if running_loss < bestLoss:
    bestLoss = running_loss
    torch.save(model.state_dict(),"myNet.pth")
    

def test_loop(model,dataloader,percent):
  size = percent/100*len(dataloader.dataset)
  correct = 0

  i = 0
  with torch.no_grad():
        for X, y in dataloader:
            pred = model(X)
            correct += (pred.argmax() == y.argmax()).type(torch.float).sum().item()

            i += 1
            if i >= size:
              break

  correct /= size

  print(f"Test Accuracy: {(100*correct):>0.2f}%\n")

    
print("Done!")

In [None]:
def myOptimizers(model,lr,idx):
    m = idx % 3
    if m == 0:
        return optim.Adam(model.parameters(),lr=lr,weight_decay=1e-4)
    if m == 1:
        return optim.RMSprop(model.parameters(), lr=lr)
    if m == 2:
        return torch.optim.Rprop(model.parameters(), lr=lr)


def myLossFunctions(rnd):
    if rnd == 0:
        return nn.CrossEntropyLoss()
    if rnd == 1:
        return nn.MSELoss()

print("Done!")

## Adatok importálása

In [None]:
p2019_01_file = r'C:\Users\User\Desktop\parquet_files\yellow_tripdata_2019-01.parquet'
p2019_02_file = r'C:\Users\User\Desktop\parquet_files\yellow_tripdata_2019-02.parquet'


p2020_03_file = r'C:\Users\User\Desktop\parquet_files\yellow_tripdata_2020-03.parquet'
p2020_04_file = r'C:\Users\User\Desktop\parquet_files\yellow_tripdata_2020-04.parquet'
p2020_05_file = r'C:\Users\User\Desktop\parquet_files\yellow_tripdata_2020-05.parquet'

#jóval covid előtti
data_2019_01 = pd.read_parquet(p2019_01_file, engine='auto')
data_2019_02 = pd.read_parquet(p2019_02_file, engine='auto')

#covid pánik
data_2020_03 = pd.read_parquet(p2020_03_file, engine='auto')
data_2020_04 = pd.read_parquet(p2020_04_file, engine='auto')
data_2020_05 = pd.read_parquet(p2020_05_file, engine='auto')

print("Done!")

In [None]:
dd_ = dd.read_parquet(p2020_03_file, engine='auto')

In [None]:
dd_

In [None]:
pd_

In [None]:
for i in range(len(dd_)): 
    dd_['Travel_time_in_secs'][i] = (dd_['tpep_dropoff_datetime'][i]-dd_['tpep_pickup_datetime'][i]).total_seconds()

In [None]:
type(pd_['tpep_dropoff_datetime'][0]-pd_['tpep_pickup_datetime'][0])

In [None]:
for i in range(10):   
    data_2020_03['Travel_time_in_secs'][i] = (data_2020_03['tpep_dropoff_datetime'][i]-data_2020_03['tpep_pickup_datetime'][i]).total_seconds()

In [None]:
data_2020_03

In [None]:
import datetime

i = 0

difference = (data_2020_03['tpep_dropoff_datetime'][i]-data_2020_03['tpep_pickup_datetime'][i]).total_seconds()
print(difference)

#vendor ID: szolgáltatók
#utazás ideje mp-ben


In [None]:
data_2020_03

In [None]:
i = 2

difference = data_2020_03['tpep_dropoff_datetime'][i]-data_2020_03['tpep_pickup_datetime'][i]

seconds_in_day = 24 * 60 * 60

divmod(difference.days * seconds_in_day + difference.seconds, 60)

In [None]:
data_2020 = [data_2020_03, data_2020_04]
s = 0
for d in data_2020:
  s += len(d)

data_2019 = data_2019_01[0:s]
whole_data = [data_2019]
whole_data.extend(data_2020)

print(f'Size of 2019 dataset: {len(whole_data[0])}, size of 2020 dataset: {s}')
print("Done!")

## Adatok betöltése

In [None]:
batch_size = 32
train_percent = 65
test_percent = 25

print("Train and test dataloading started...")
train_dataloader, test_dataloader = make_dataloaders(whole_data,batch_size,train_percent,test_percent)

train_size = len(train_dataloader.dataset)
test_size = len(test_dataloader.dataset)

print(f'Size of train dataset: {train_size} ,size of test dataset: {test_size} proportion goodness is {(train_percent/test_percent)/(train_size/test_size)}')

In [None]:
"""
val_data_2020 = [data_2020_03[int((test_percent + 1)/100*len(data_2020_03)):int((test_percent + 3)/100*len(data_2020_03))],
                 data_2020_04[int((test_percent + 1)/100*len(data_2020_04)):int((test_percent + 3)/100*len(data_2020_04))],
                 data_2020_05[int((test_percent + 1)/100*len(data_2020_05)):int((test_percent + 3)/100*len(data_2020_05))]]
"""

val_data_2020 = [data_2020_05]
Sum = 0
for d in val_data_2020:
  Sum += len(d)

#val_data_2019 = data_2019_01[int((test_percent + 1)/100*len(data_2019_01)):s+int((test_percent + 1.5)/100*len(data_2019_01))]

val_data_2019 = data_2019_02[0:Sum]
val_data = [val_data_2019]
val_data.extend(val_data_2020)

val_size_2019 = len(val_data_2019)
val_size_2020 = Sum

print(f'Size of 2019 dataset: {val_size_2019} ,size of 2020 dataset: {val_size_2020}')

In [None]:
print("Val dataloading started...")
val_dataloader = make_val_dataloader(val_data)
print(f'Size of val dataset: {len(val_dataloader.dataset)}')

## Modell osztályok

In [None]:
learning_rate = 0.01

input_size = train_dataloader.dataset.tensors[0].size(1)
output_size = train_dataloader.dataset.tensors[1].size(1)

class myLNN1(nn.Module):
  def __init__(self,Nin,Nout):
    super(myLNN1,self).__init__()

    self.L1 = nn.Linear(Nin,128)
    self.L2 = nn.Linear(128,64)
    self.L3 = nn.Linear(64,Nout)
  
  def forward(self,x):

    x = torch.relu(self.L1(x))
    x = torch.relu(self.L2(x))
    x = self.L3(x)

    return x

model1 = myLNN1(input_size,output_size)

class myLNN2(nn.Module):
  def __init__(self,Nin,Nout):
    super(myLNN2,self).__init__()

    self.L1 = nn.Linear(Nin,50)
    self.L2 = nn.Linear(50,Nout)
  
  def forward(self,x):

    x = torch.relu(self.L1(x))
    x = self.L2(x)

    return x

model2 = myLNN2(input_size,output_size)

class myLNN3(nn.Module):
  def __init__(self,Nin,Nout):
    super(myLNN3,self).__init__()

    self.L1 = nn.Linear(Nin,20)
    self.L2 = nn.Linear(20,20)
    self.L3 = nn.Linear(20,Nout)
  
  def forward(self,x):

    x = torch.relu(self.L1(x))
    x = torch.relu(self.L2(x))
    x = self.L3(x)

    return x

model3 = myLNN3(input_size,output_size)

class myLNN4(nn.Module):
  def __init__(self,Nin,Nout):
    super(myLNN4,self).__init__()

    self.L1 = nn.Linear(Nin,128)
    self.L2 = nn.Linear(128,240)
    self.L3 = nn.Linear(240,Nout)
  
  def forward(self,x):

    x = torch.relu(self.L1(x))
    x = torch.relu(self.L2(x))
    x = self.L3(x)

    return x

model4 = myLNN4(input_size,output_size)

class myLNN5(nn.Module):
  def __init__(self,Nin,Nout):
    super(myLNN5,self).__init__()

    self.L1 = nn.Linear(Nin,200)
    self.L2 = nn.Linear(200,Nout)
  
  def forward(self,x):

    x = torch.relu(self.L1(x))
    x = self.L2(x)

    return x

model5 = myLNN5(input_size,output_size)

class expert_system(nn.Module):
  def __init__(self,models,output_size):
    super(expert_system,self).__init__()

    self.model_parts = models
  
  def forward(self,x):

    outs = torch.zeros(x.size(0),output_size)
    outs = outs.clone().detach().requires_grad_(False)
    for i in range(len(self.model_parts)):
      
      outs += self.model_parts[i](x)

    return outs

In [None]:
models = [model1,model2,model3,model4,model5]
print("Done!")

## Tanítás és validálás (Single model)

In [None]:
epochNum = 3
loss_fn = nn.CrossEntropyLoss()

model = model1

optimizer = myOptimizers(model,learning_rate,1)
scheduler = lr_scheduler.CosineAnnealingLR(optimizer, epochNum, 1e-4)
for i in range(epochNum):
    print(f'Epoch {i+1}')
    train_loop(model,loss_fn,train_dataloader,optimizer,scheduler)
    test_loop(model,test_dataloader, 100)

print("Validation...")
test_loop(model,val_dataloader,100)
print("Done!")

## Tanítás és validálás (Expert system)

In [None]:
epochNum = 5

j = 0
for model in models:
    loss_fn = myLossFunctions(0)
    optimizer = myOptimizers(model,learning_rate,0)
    scheduler = lr_scheduler.CosineAnnealingLR(optimizer, epochNum, 1e-4)
    j += 1
    print(f'Model {j}')
    for i in range(epochNum):
        print(f'Epoch {i+1}')
        train_loop(model,loss_fn,train_dataloader,optimizer,scheduler)
        test_loop(model,test_dataloader, 100)

print("Done!")

In [None]:
models.pop(3)

In [None]:
models.pop(1)

In [None]:
print("Validation...")
exp_sys = expert_system(models,output_size)
test_loop(model1,val_dataloader,100)

In [None]:
#torch.save(model.state_dict(), 'myExpertNet.pth')

## Vizualizáció

In [None]:
#READING THE DATA

import pandas as pd
import dask.dataframe as dd

# Specify the names of the columns to include in the DataFrame
usecols = ['dropoff_x','dropoff_y','pickup_x','pickup_y','dropoff_hour','pickup_hour','passenger_count', 'tip_amount']

# Read the data from the "nyc_taxi_wide.parq" file into a Dask DataFrame
# This contained location datas before Covid
df = dd.read_parquet('nyc_taxi_wide.parq')[usecols].persist()
df.head()

In [None]:
#EARLY TEST OF THE VISUALIZATION OF THE DATA WITH PLOT

from holoviews.element.tiles import StamenTerrain
hv.extension('bokeh')

# Define the x and y ranges for the map and sample a small fraction of rows
x_range, y_range =(-8242000,-8210000), (4965000,4990000)
samples = df.sample(frac=1e-4)

# Create a StamenTerrain object and a Points object, and combine them
tiles = StamenTerrain().redim.range(x=x_range, y=y_range)
points = hv.Points(samples, ['dropoff_x', 'dropoff_y'])
tiles * hv.Points(df.sample(frac=1e-2), ['dropoff_x', 'dropoff_y'])


In [None]:
#CREATING A HEATMAP BASED ON DROPOFF LOCATIONS

import datashader as ds
import holoviews.operation.datashader as hd
from datashader.colors import Hot
import holoviews as hv
from holoviews import opts
from holoviews.element.tiles import StamenTerrain
from holoviews.element.tiles import EsriImagery
hv.extension('bokeh')

# Convert the Points object from the DataFrame into a shaded raster image
shaded = hd.datashade(hv.Points(df, ['dropoff_x', 'dropoff_y']), cmap=Hot, aggregator=ds.count('tip_amount'))

# With the dynspread function adjust the shaded image, and set the visual options for it
hd.dynspread(shaded, threshold=0.5, max_px=4).opts(bgcolor='black', xaxis=None, yaxis=None, width=900, height=500)

In [None]:
#REMOVE OUTLIERS FROM THE TIP AMOUNT COLUMN - PREPARING FOR DRAWING A MAP BASED ON TIP AMOUNT AVERAGES
#

from numpy import mean
from numpy import std

# Dask requirement to process data
data = df['tip_amount'].compute()

# Calculate summary statistics (mean and standard deviation) for the "tip_amount" column
data_mean, data_std = mean(data), std(data)
print(mean(data))
print(std(data))

# Calculate the lower and upper cutoffs for identifying outliers
cut_off = data_std * 1.5
lower, upper = data_mean - cut_off, data_mean + cut_off
print(lower)
print(upper)

# Identify the outliers in the "tip_amount" column, based on the cutoffs
outliers = [x for x in data if x < lower or x > upper]
print('Identified outliers: %d' % len(outliers))
print(outliers)

# Filter the DataFrame to exclude values in the "tip_amount" column that are greater than the upper cutoff
df = df[(df['tip_amount'] < upper)]

In [None]:
#VISUALIZATION OF THE AVERAGE TIP AMOUNT

import colorcet as cc
from holoviews.operation.datashader import rasterize

# Create a Points object from a DataFrame, using the dropoff_x and dropoff_y columns
points = hv.Points(df, ['dropoff_x', 'dropoff_y'])
# Define a dictionary of options for the rasterize function (use the "bmy" color map, and color bar at the bottom)
ropts = dict(tools=['hover'], colorbar=True, colorbar_position='bottom', cmap=cc.bmy, cnorm='eq_hist')
# Use the rasterize function, and set the options for it
taxi_trips = rasterize(points, aggregator=ds.mean('tip_amount')).opts(**ropts)
# Combine the Raster object with a map_tiles object
map_tiles * taxi_trips