In [22]:
from datetime import date, datetime, time, timedelta
from geopy.distance import geodesic
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import expr, hour, count
from pyspark.sql.window import Window
import json
import matplotlib.pyplot as plt 
import numpy as np
import pandas as pd 
import random


GEO_POINTS_DATA_PATH = 'data/London_postcodes.csv'
NUM_RECORDS = 1_000_000

# Generate Rides

In [2]:

def calc_cost(x):
    start_time = x['start_time'].time()
    distance = x['distance']
    price = 1
    price_peak = 2
    
    if time(8,0,0) < start_time < time(11,0,0) or time(15,0,0) < start_time < time(22,0,0):
        cost = distance * price_peak
    else:
        cost = distance * price
        
    return cost


def calc_road_time(x):
    start_time = x['start_time'].time()
    distance = x['distance']
    speed = 60
    speed_peak = 40

    if time(8,0,0) < start_time < time(11,0,0) or time(15,0,0) < start_time < time(22,0,0):
        road_time = timedelta(distance/speed/24)
    else:
        road_time = timedelta(distance/speed_peak/24)
        
    return road_time

In [3]:
geo_points = pd.read_csv(GEO_POINTS_DATA_PATH, low_memory=False)
rides = pd.DataFrame(columns=[
    'rider_id',
    'client_id',
    'start', 
    'start_latitude', 
    'start_longtitude',
    'end',
    'end_latitude', 
    'end_longtitude',
    'start_time',
    'end_time',
    'cost',
    'rider_score',
    'rider_fitback',
    'text',
    'client_score',
    'client_fitback',
])

In [4]:
riders_pull = np.arange(1000, 4000)
rides['rider_id'] = np.random.choice(riders_pull, NUM_RECORDS)
rides['rider_score' ] = np.random.choice((None,None,None,None,None,1,2,3,4,5), NUM_RECORDS)

In [5]:
clients_pull = np.arange(1000, 7000)
rides['client_id'] = np.random.choice(clients_pull, NUM_RECORDS)
rides['client_score' ] = np.random.choice((None,None,None,None,None,1,2,3,4,5), NUM_RECORDS)

In [6]:
start_pull = geo_points[['Postcode', 'Latitude', 'Longitude']].sample(n=NUM_RECORDS, replace=True).reset_index(drop=True)
rides[['start', 'start_latitude', 'start_longtitude']] = start_pull

In [7]:
end_pull = geo_points[['Postcode', 'Latitude', 'Longitude']].sample(n=NUM_RECORDS, replace=True).reset_index(drop=True)
rides[['end', 'end_latitude', 'end_longtitude']] = end_pull

In [8]:
distance_agg = lambda x: geodesic(
    (x['start_latitude'], x['start_longtitude']), 
    (x['end_latitude'], x['end_longtitude'])).km
rides['distance'] = rides.apply(distance_agg, axis='columns', result_type='reduce')

In [9]:
time_pull = [
    datetime.combine(date.today(), time(h,m,s))
    for h in range(24) 
    for m in range(60) 
    for s in range(60)
]
rides['start_time'] = np.random.choice(time_pull, NUM_RECORDS)

In [10]:
rides['cost'] = rides.apply(calc_cost, axis='columns', result_type='reduce')

In [11]:

rides['road_time'] = rides.apply(calc_road_time, axis='columns', result_type='reduce')

In [12]:
rides['end_time'] = rides['start_time'] + rides['road_time']

In [13]:
display(rides)

Unnamed: 0,rider_id,client_id,start,start_latitude,start_longtitude,end,end_latitude,end_longtitude,start_time,end_time,cost,rider_score,rider_fitback,text,client_score,client_fitback,distance,road_time
0,3284,2453,E4 7WW,51.633250,0.008156,EC2Y 9DP,51.518267,-0.089587,2020-12-24 21:40:25,2020-12-24 21:54:53.600071,28.953336,,,,,,14.476668,0 days 00:14:28.600071
1,1983,2525,W1C 2PW,51.513765,-0.153232,UB4 9LD,51.532188,-0.405000,2020-12-24 14:51:35,2020-12-24 15:17:58.451304,17.593903,,,,,,17.593903,0 days 00:26:23.451304
2,1688,6331,SW19 8FH,51.417014,-0.213582,NW1W 7SP,51.530785,-0.135430,2020-12-24 08:44:51,2020-12-24 08:58:37.403628,27.546788,,,,,,13.773394,0 days 00:13:46.403628
3,3895,2470,SW4 0XP,51.463241,-0.139336,SW18 3WJ,51.448231,-0.191526,2020-12-24 13:58:50,2020-12-24 14:04:49.418417,3.993538,,,,,,3.993538,0 days 00:05:59.418417
4,2875,2277,W12 6AW,51.511059,-0.220758,CR2 6WF,51.375716,-0.091863,2020-12-24 13:04:09,2020-12-24 13:30:26.051764,17.522797,,,,4,,17.522797,0 days 00:26:17.051764
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
999995,2684,4855,N4 2UT,51.568646,-0.097533,SE18 5TP,51.493967,0.041228,2020-12-24 12:30:43,2020-12-24 12:49:47.636302,12.718181,4,,,5,,12.718181,0 days 00:19:04.636302
999996,2460,5165,N6 4JS,51.574668,-0.163407,W1R 0DD,51.514151,-0.145232,2020-12-24 07:44:15,2020-12-24 07:54:31.507239,6.850080,,,,1,,6.850080,0 days 00:10:16.507239
999997,2252,4201,SW8 4UN,51.474559,-0.141410,BR3 1GL,51.399286,-0.049249,2020-12-24 14:31:05,2020-12-24 14:46:54.077988,10.545311,2,,,,,10.545311,0 days 00:15:49.077988
999998,1554,3596,HA9 0AN,51.561422,-0.273515,HA1 3PB,51.561626,-0.343853,2020-12-24 17:51:15,2020-12-24 17:56:07.666827,9.755561,,,,5,,4.877780,0 days 00:04:52.666827


# Map Reduce

In [59]:
def top_k_drivers(df: DataFrame, limit: int):
    return df.where(df.rider_score.isNotNull()) \
    .orderBy(df.rider_score, ascending=False) \
    .select('rider_id', 'rider_score') \
    .limit(limit) \
    .rdd.map(lambda row: {'rider_id': row[0], 'rider_score': row[1]})

def top_k_clients(df: DataFrame, limit: int):
    return df.where(df.client_score.isNotNull()) \
    .orderBy(df.client_score, ascending=False) \
    .select('client_id', 'client_score') \
    .limit(limit) \
    .rdd.map(lambda row: {'client_id': row[0], 'client_score': row[1]})


def worst_drivers(df: DataFrame, limit: int):
    return df.where(df.rider_score < 3.5) \
    .orderBy(df.rider_score, ascending=True) \
    .select('rider_id', 'rider_score') \
    .limit(limit) \
    .rdd.map(lambda row: {'rider_id': row[0], 'rider_score': row[1]})

In [17]:
spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame(rides)

In [60]:
top_k_drivers(df, 100).collect()

[{'rider_id': 1401, 'rider_score': 5},
 {'rider_id': 3351, 'rider_score': 5},
 {'rider_id': 2877, 'rider_score': 5},
 {'rider_id': 1719, 'rider_score': 5},
 {'rider_id': 3856, 'rider_score': 5},
 {'rider_id': 1865, 'rider_score': 5},
 {'rider_id': 2259, 'rider_score': 5},
 {'rider_id': 3657, 'rider_score': 5},
 {'rider_id': 3604, 'rider_score': 5},
 {'rider_id': 2898, 'rider_score': 5},
 {'rider_id': 2655, 'rider_score': 5},
 {'rider_id': 3062, 'rider_score': 5},
 {'rider_id': 2761, 'rider_score': 5},
 {'rider_id': 2225, 'rider_score': 5},
 {'rider_id': 3338, 'rider_score': 5},
 {'rider_id': 1002, 'rider_score': 5},
 {'rider_id': 1116, 'rider_score': 5},
 {'rider_id': 3068, 'rider_score': 5},
 {'rider_id': 3504, 'rider_score': 5},
 {'rider_id': 2437, 'rider_score': 5},
 {'rider_id': 1697, 'rider_score': 5},
 {'rider_id': 2362, 'rider_score': 5},
 {'rider_id': 1579, 'rider_score': 5},
 {'rider_id': 2400, 'rider_score': 5},
 {'rider_id': 1772, 'rider_score': 5},
 {'rider_id': 1944, 'ride

In [61]:
top_k_clients(df, 50).collect()

[{'client_id': 5248, 'client_score': 5},
 {'client_id': 2107, 'client_score': 5},
 {'client_id': 2071, 'client_score': 5},
 {'client_id': 2172, 'client_score': 5},
 {'client_id': 4610, 'client_score': 5},
 {'client_id': 1303, 'client_score': 5},
 {'client_id': 6071, 'client_score': 5},
 {'client_id': 3689, 'client_score': 5},
 {'client_id': 5722, 'client_score': 5},
 {'client_id': 1059, 'client_score': 5},
 {'client_id': 6509, 'client_score': 5},
 {'client_id': 4598, 'client_score': 5},
 {'client_id': 6212, 'client_score': 5},
 {'client_id': 4408, 'client_score': 5},
 {'client_id': 1059, 'client_score': 5},
 {'client_id': 1607, 'client_score': 5},
 {'client_id': 2949, 'client_score': 5},
 {'client_id': 3888, 'client_score': 5},
 {'client_id': 4951, 'client_score': 5},
 {'client_id': 2919, 'client_score': 5},
 {'client_id': 5117, 'client_score': 5},
 {'client_id': 2028, 'client_score': 5},
 {'client_id': 6108, 'client_score': 5},
 {'client_id': 3933, 'client_score': 5},
 {'client_id': 1

In [None]:
worst_drivers(df, 200).collect()

[{'rider_id': 3836, 'rider_score': 1},
 {'rider_id': 1290, 'rider_score': 1},
 {'rider_id': 3547, 'rider_score': 1},
 {'rider_id': 1187, 'rider_score': 1},
 {'rider_id': 2791, 'rider_score': 1},
 {'rider_id': 3924, 'rider_score': 1},
 {'rider_id': 2011, 'rider_score': 1},
 {'rider_id': 2045, 'rider_score': 1},
 {'rider_id': 3206, 'rider_score': 1},
 {'rider_id': 3035, 'rider_score': 1},
 {'rider_id': 3970, 'rider_score': 1},
 {'rider_id': 2505, 'rider_score': 1},
 {'rider_id': 2574, 'rider_score': 1},
 {'rider_id': 1805, 'rider_score': 1},
 {'rider_id': 3070, 'rider_score': 1},
 {'rider_id': 2577, 'rider_score': 1},
 {'rider_id': 3428, 'rider_score': 1},
 {'rider_id': 3810, 'rider_score': 1},
 {'rider_id': 1773, 'rider_score': 1},
 {'rider_id': 2972, 'rider_score': 1},
 {'rider_id': 2379, 'rider_score': 1},
 {'rider_id': 3305, 'rider_score': 1},
 {'rider_id': 1149, 'rider_score': 1},
 {'rider_id': 3217, 'rider_score': 1},
 {'rider_id': 2892, 'rider_score': 1},
 {'rider_id': 1848, 'ride