In [143]:
import os, shutil

#Create a folder for the experiment files
training_folder = 'sendy-script'
os.makedirs(training_folder, exist_ok=True)

#Copy the data file into the experiment folder
csv_names = ["Train(1).csv", "Riders.csv"]

for csv in csv_names:
    shutil.copy(csv, os.path.join(training_folder,csv))

In [2]:
import findspark
findspark.init('/home/nelsonchris/spark-2.4.5-bin-hadoop2.7')

from pyspark.sql import SparkSession
import pyspark
from pyspark.sql.functions import hour, dayofweek
from pyspark.sql.functions import col
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StringType, DoubleType
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from math import radians, cos, sin, atan, sqrt, atan2, degrees
print("Libraries imported")

spark = SparkSession.builder.appName('Sendy_logistics').getOrCreate()
print("Spark session created")

ModuleNotFoundError: No module named 'findspark'

In [147]:
%%writefile -a $training_folder/sendy.py
print("====loading data=====")
data = spark.read.csv('Train(1).csv', inferSchema=True,header=True)
rider = spark.read.csv('Riders.csv', inferSchema=True,header=True)
print("=====Data loaded=====")

Appending to sendy-script/sendy.py


In [134]:
data.count() + test.count()

28269

In [135]:
test.count()

7068

In [92]:
data.select(['Order No','User Id','Vehicle Type','Platform Type','Personal or Business']).show(3)

+--------------+------------+------------+-------------+--------------------+
|      Order No|     User Id|Vehicle Type|Platform Type|Personal or Business|
+--------------+------------+------------+-------------+--------------------+
| Order_No_4211| User_Id_633|        Bike|            3|            Business|
|Order_No_25375|User_Id_2285|        Bike|            3|            Personal|
| Order_No_1899| User_Id_265|        Bike|            3|            Business|
+--------------+------------+------------+-------------+--------------------+
only showing top 3 rows



In [93]:
data.printSchema()

root
 |-- Order No: string (nullable = true)
 |-- User Id: string (nullable = true)
 |-- Vehicle Type: string (nullable = true)
 |-- Platform Type: integer (nullable = true)
 |-- Personal or Business: string (nullable = true)
 |-- Placement - Day of Month: integer (nullable = true)
 |-- Placement - Weekday (Mo = 1): integer (nullable = true)
 |-- Placement - Time: timestamp (nullable = true)
 |-- Confirmation - Day of Month: integer (nullable = true)
 |-- Confirmation - Weekday (Mo = 1): integer (nullable = true)
 |-- Confirmation - Time: timestamp (nullable = true)
 |-- Arrival at Pickup - Day of Month: integer (nullable = true)
 |-- Arrival at Pickup - Weekday (Mo = 1): integer (nullable = true)
 |-- Arrival at Pickup - Time: timestamp (nullable = true)
 |-- Pickup - Day of Month: integer (nullable = true)
 |-- Pickup - Weekday (Mo = 1): integer (nullable = true)
 |-- Pickup - Time: timestamp (nullable = true)
 |-- Arrival at Destination - Day of Month: integer (nullable = true)
 |--

# Data Preprocessing

In [148]:
%%writefile -a $training_folder/sendy.py
#Merge rider data to both train and test
data_merge = data.join(rider, on=['Rider Id'],how ='inner')
print("Megered data and rider together")

cols_to_drop = ['Vehicle Type','Order No','Arrival at Destination - Day of Month',
       'Arrival at Destination - Weekday (Mo = 1)',
        'Arrival at Destination - Time','Precipitation in millimeters',
                'Temperature','Rider Id', "User Id", ]

time_cols = ['Placement - Time',
             'Confirmation - Time',
             'Arrival at Pickup - Time',
             'Pickup - Time']

data_merge = data_merge.drop(*cols_to_drop)

Appending to sendy-script/sendy.py


In [98]:
def spark_shape(self):
    return (self.count(), len(self.columns))
pyspark.sql.dataframe.DataFrame.shape = spark_shape

In [99]:
data_merge.shape()

(21201, 24)

In [101]:
%%writefile -a $training_folder/sendy.py

#Extraxt hour and day of the week data from date columns

for col in time_cols:
    train = train.withColumn(col+"_Hour", hour(col))
print("Tine columns extracted")

#Calculate the harvsine distance
def harvsine_array(lat1, lng1 , lat2, lng2):
    lat1, lng1, lat2, lng2 = map(radians, (lat1, lng1 , lat2, lng2))
    AVG_EARTH_RADIUS = 6371 #in kilometers
    lat = lat2 - lat1
    lng = lng2 - lng1
    d = sin(lat * 0.5) ** 2 + cos(lat1) * cos(lat2) * sin(lng * 0.5) ** 2
    h = 2 * AVG_EARTH_RADIUS * atan(sqrt(d))
    return h


# Manhattan distance is the sum of the horizontal and vertical distance between points on a grid
def manhattan_dist(lat1 ,lng1, lat2, lng2):
    a = harvsine_array(lat1, lng1 , lat1 , lng2)
    b = harvsine_array(lat1,lng1, lat2, lng1)
    
    return a + b

#Direction from the given coordinates
def bearing_array(lat1, lng1, lat2, lng2):
    AVG_EARTH_RADIUS = 6371  # in km
    lng_delta_rad = radians(lng2 - lng1)
    lat1, lng1, lat2, lng2 = map(radians, (lat1, lng1, lat2, lng2))
    y = sin(lng_delta_rad) * cos(lat2)
    x = cos(lat1) * sin(lat2) - sin(lat1) * cos(lat2) * cos(lng_delta_rad)
    return degrees(atan2(y, x))

#Register the created functions
HarvsineUDF = udf(lambda a,b,c,d: harvsine_array(a,b,c,d),StringType())
ManhattanUDF = udf(lambda a,b,c,d: manhattan_dist(a,b,c,d),StringType())
BearingUDF = udf(lambda a,b,c,d: bearing_array(a,b,c,d),StringType())

print("Applying feature enginerring to dataframe")
# Call the harvsine function
data_merge = data_merge.withColumn("Harvsine Distance", HarvsineUDF("Pickup Lat", "Pickup Long", "Destination Lat", "Destination Long").cast(DoubleType()))

# Call the manhattan dist function
data_merge = data_merge.withColumn("Manhattan Distance", ManhattanUDF("Pickup Lat", "Pickup Long", "Destination Lat", "Destination Long").cast(DoubleType()))

# Call the bearing fnuction
data_merge = data_merge.withColumn("Direction Distance", BearingUDF("Pickup Lat", "Pickup Long", "Destination Lat", "Destination Long").cast(DoubleType()))

In [102]:
harvsine_array(-1.317755, 36.830370, -1.300406, 36.829741)#,manhattan_dist(-1.351453, 36.830370, -1.300406, 36.829741),bearing_array(-1.351453, 36.830370, -1.300406, 36.829741)

1.9303875732130382

In [104]:
print("Applying feature enginerring to dataframe")
# Call the harvsine function
data_merge = data_merge.withColumn("Harvsine Distance", HarvsineUDF("Pickup Lat", "Pickup Long", "Destination Lat", "Destination Long").cast(DoubleType()))

# Call the manhattan dist function
data_merge = data_merge.withColumn("Manhattan Distance", ManhattanUDF("Pickup Lat", "Pickup Long", "Destination Lat", "Destination Long").cast(DoubleType()))

# Call the bearing fnuction
data_merge = data_merge.withColumn("Direction Distance", BearingUDF("Pickup Lat", "Pickup Long", "Destination Lat", "Destination Long").cast(DoubleType()))

In [105]:
data_merge.select("Harvsine Distance", "Manhattan Distance", "Direction Distance").show(20)

+------------------+------------------+-------------------+
| Harvsine Distance|Manhattan Distance| Direction Distance|
+------------------+------------------+-------------------+
|1.9303332205710244| 1.999021623913786|-2.0769034077507285|
|11.339844530356967|15.720954182392934|-56.392163212783096|
|1.8800787264224066|2.5111852071397336| -64.18386563483746|
| 4.943458069180657| 6.835993082860763| -57.09155293419641|
|3.7248284348567235| 5.130330919876971| 148.11439790865393|
|6.6385397000476445| 9.361638171756846|  40.68337317738699|
| 2.907391577671219|3.2041393518007317|  83.80566320374294|
| 2.030743156688254|2.2793926801519393| -97.53156300361854|
| 6.960283546786541| 9.841714698013039|  136.0513388990286|
| 9.305257529847268|10.323211038893358|  83.32990760598942|
|   4.3124483007969| 4.390565812048895|-1.0475300519138007|
|10.756208553054682|13.378440135944835| 106.58157223754336|
| 5.325641743113615| 7.036870106880482|  114.1175917599587|
|15.183141058098334| 17.94165225881389|-

In [106]:
type(data_merge)

pyspark.sql.dataframe.DataFrame

In [107]:
data_merge.shape()

(21201, 27)

In [108]:
cols = data_merge.columns

In [109]:
data_merge = data_merge.drop(*time_cols)

In [110]:
#String indexer to convert cat column to double
cols_index = ["Personal or Business"]

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(data_merge) for column in cols_index]
pipeline_idx = Pipeline(stages = indexers)
final_df = pipeline_idx.fit(data_merge).transform(data_merge)
final_df = final_df.drop(*cols_index)

print("========String indexer applied========")

In [111]:
train_data,test_data = final_df.randomSplit([0.7,0.3])
print("Data frame split into train and test")

In [128]:
cols_to_vector = [cols for cols in test_data.columns if cols != 'Time from Pickup to Arrival']
label = test_data.select("Time from Pickup to Arrival")

In [113]:
assembler = VectorAssembler(
            inputCols = cols_to_vector,
            outputCol = "features"
    )

gbt = GBTRegressor(featuresCol="features", 
                   labelCol="Time from Pickup to Arrival")
# final_df = assembler.transform(final_df)

In [114]:
paramGrid = ParamGridBuilder()\
            .addGrid(gbt.maxDepth, [2, 5])\
            .addGrid(gbt.maxIter, [10, 50])\
            .build()

#Next define evaluation metric.
evaluator = RegressionEvaluator(metricName="rmse",
                                labelCol=gbt.getLabelCol(),
                                predictionCol=gbt.getPredictionCol())

cv = CrossValidator(estimator=gbt, evaluator=evaluator,
                   estimatorParamMaps=paramGrid)

In [116]:
pipeline = Pipeline(stages=[assembler, cv])

In [117]:
print("===========Fitting data pipeline===========")
pipeline_model = pipeline.fit(train_data)
print("========Model trained=========")

In [119]:
predictions = pipeline_model.transform(test_data)

In [132]:
rmse = evaluator.evaluate(predictions)
print(rmse)

769.2976609811352


In [141]:
import os
os.makedirs("Model", exist_ok=True)
pipeline_model.save("Model/v1")
print("pipeline saved")