# Project

## Loading Spark Session

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, desc, col, when, max, size
from functools import reduce

spark = SparkSession.builder.appName("Taxi")\
        .config("spark.driver.memory", "6g")\
        .config("spark.driver.cores", "8")\
        .getOrCreate()

ModuleNotFoundError: No module named 'pyspark'

In [None]:
from pyspark import SparkContext
from pyspark import SparkConf
sc=spark.sparkContext

In [None]:
# from pyspark.rdd import portable_hash
# from pyspark.statcounter import StatCounter

import os
import json
from datetime import datetime
from operator import itemgetter
#from itertools import chain, imap
from shapely.geometry import shape, Point
from matplotlib import pyplot as plt

In [None]:
#These are some printing helper functions we will use to make the output more clear
from pprint import pprint
def title(s):
    pprint("---- %s -----" %s)    
    
def see(s, v):
    pprint("---- %s -----" %s)
    pprint(v)

In [None]:
file = "../../data/ch08-geospatial/sample.csv"
fileRDD = sc.textFile(file)

In [None]:
see("fileRDD", fileRDD.take(5))

## Parsing Geo Data

In [None]:
def parse(fields):
    license = fields[1]
    pickupTime = datetime.strptime(fields[5], '%Y-%m-%d %H:%M:%S')
    dropoffTime = datetime.strptime(fields[6], '%Y-%m-%d %H:%M:%S')
    try:
        pickupLoc = Point(float(fields[10]), float(fields[11]))
        dropoffLoc = Point(float(fields[12]), float(fields[13]))
    except ValueError:
        pickupLoc = Point(0.0, 0.0)
        dropoffLoc= Point(0.0, 0.0)
    trip = {'pickupTime':pickupTime, 'dropoffTime':dropoffTime, 'pickupLoc':pickupLoc, 'dropoffLoc':dropoffLoc}
    return (license, trip)


taxiParsed = fileRDD\
        .map(lambda line: line.split(','))\
        .filter(lambda fields: len(fields) == 14 and fields[0] != "medallion")\
        .map(parse)
        
taxiParsed.cache()

see("taxiParsed", taxiParsed.take(5))

## Get The places from geo json file

In [None]:
with open('../../data/ch08-geospatial/nyc-boroughs.geojson', 'r') as geojson:
        geo = json.load(geojson)
features = geo['features']
for f in features:
    f["shape"] = shape(f['geometry'])

see("features", features[:3])

## Sort the zones by area and broadcast to executors

In [None]:
areaSortedFeatures = sorted(features, key=lambda f: (int(f['properties']["boroughCode"]), -f["shape"].area), reverse=False)
bFeatures = sc.broadcast(areaSortedFeatures)
see("areaSortedFeatures", areaSortedFeatures[:3])

## Convert long/lat to City Name

In [None]:
def borough(trip):
    for f in bFeatures.value:
        if f['shape'].contains(trip["pickupLoc"]):
            return str(f['properties']["borough"])
    return None

boroughCount = taxiParsed.values().map(borough).countByValue().items()

see("boroughCount", list(boroughCount))


## Time Processing:

### 1. duration, count in hours

In [None]:
def hours(trip):
    d= trip['dropoffTime'] - trip['pickupTime']
    return int( ((d.days)*24) + (d.seconds/3600))

hoursCount = taxiParsed.values().map(hours).countByValue().items()
sortedHoursCount = sorted(hoursCount, key=itemgetter(0), reverse=False)

for val in sortedHoursCount:
    print(val)    

### 2. Cleaning data:
###     Remove trips with -ve durations and longer than 3 hours
###     Remove trips with invalid location

In [None]:
def goodHour(hrs):
    return 0 <= hrs and hrs <= 3

taxiClean = taxiParsed.filter(lambda x: goodHour( hours(x[1]) )).cache()
taxiParsed.unpersist()


def hasZero(trip):
    zero = Point(0.0, 0.0)
    return (zero == trip["pickupLoc"] or zero == trip["dropoffLoc"])

taxiDone = taxiClean.filter(lambda x: not hasZero(x[1])).cache()

In [None]:
boroughCount = taxiDone.values().map(borough).countByValue().items()
see("boroughCount", list(boroughCount))

In [None]:
from pyspark.rdd import portable_hash


In [None]:
epoch = datetime.utcfromtimestamp(0)

def getMillis(time):
    return (time - epoch).total_seconds() * 1000.0

def partitioner(n):
    def partitioner_(x):
        return portable_hash(x[0]) % n
    return partitioner_

# separate each driver sesion

def groupSorted(it, splitFunc):
    cur={'lic': None, 'trips': []}
    def mapper(x):
        lic = x[0][0]
        trip = x[1]
        # begin new session if driver has changed or time has expired
        if(lic != cur['lic'] or splitFunc(cur['trips'][-1], trip)):
            result = (cur['lic'], cur['trips'])
            cur['lic'] = lic
            cur['trips'] = [trip]
            if(len(result[1]) == 0):
                return None
            else:
                return result
        else:
            cur['trips'].append(trip)
            return None
    m = list(map(mapper, it))
    #append last driver data:
    m.append((cur['lic'], cur['trips']))
    return filter(lambda f: f is not None, m)

def groupByKeyAndSortValues(rdd, secondaryKeyFunc, splitFunc):
    presess = rdd.map(lambda x: ((x[0], secondaryKeyFunc(x[1])), x[1]) )
    numPartitions = presess.getNumPartitions()
    return presess.repartitionAndSortWithinPartitions(partitionFunc=partitioner(numPartitions))\
        .mapPartitions(lambda partition: groupSorted(partition, splitFunc))
        
def split(t1, t2):
    d = t2['pickupTime'] - t1['pickupTime']
    return ((d.days*24) + (d.seconds/3600)) >= 4

def secondaryKeyFunc(trip):
    return getMillis(trip["pickupTime"])

sessions = groupByKeyAndSortValues(taxiDone, secondaryKeyFunc, split).cache()

see("sesions",sessions.take(10))

## Use Case 1
### Show the density of pickup locations on a heat map

In [None]:
import gmaps
KEY=' AIzaSyBAQy767I30Gli5xmEb6nFpL7ca80WpBJk'
gmaps.configure(api_key=KEY)

In [None]:
import geopy
g = geopy.GoogleV3()

In [None]:
import pandas as pd

In [None]:
heatDF = pd.DataFrame(list(boroughCount)).rename(columns = {0:'borough', 1:'count'})

In [None]:
heatDF['coordinates'] = heatDF['borough'].apply(g.geocode)
heatDF['latitude'] = heatDF['coordinates'].apply(lambda x: x.latitude)
heatDF['longitude'] = heatDF['coordinates'].apply(lambda x: x.longitude)
heatDF = heatDF[heatDF.borough.str.contains("None") == False]
heatDF

In [None]:
fig = gmaps.figure()
heatmap_layer = gmaps.heatmap_layer(
    heatDF[["latitude","longitude"]], weights=heatDF["count"],
    max_intensity=100000, point_radius=30
)
fig.add_layer(heatmap_layer)
fig

## Use Case 2:
### Routes Frequency

In [None]:
# define the function to search in the gepjson file

def pickup_city(trip):
    for f in bFeatures.value:
        if f['shape'].contains(trip["pickupLoc"]):
            return str(f['properties']["borough"])
    return None

def dropoff_city(trip):
    for f in bFeatures.value:
        if f['shape'].contains(trip["dropoffLoc"]):
            return str(f['properties']["borough"])
    return None

routesRDD = taxiClean.values().map(lambda x:((pickup_city(x), dropoff_city(x)),int("1"))).filter(lambda x:(x[0][0] != None) & (x[0][1] != None)).reduceByKey(lambda v1,v2:v1+v2).cache()


In [None]:
see("routesRDD",routesRDD.collect())


In [None]:
routesPD = pd.DataFrame(routesRDD.collect()).rename(columns = {0:'Route', 1:'Count'}).sort_values('Count',ascending=False)
routesPD

In [None]:
plt.figure(figsize=(12,8))

y = routesPD['Count']
x = range(len(routesPD))
plt.yticks(x,routesPD['Route'])

width = 0.3
plt.barh(x, y, width, color="blue")
plt.gca().invert_yaxis()
plt.xscale('log')

for i, v in enumerate(y):
    plt.text(v,i, str(v), color='black')

plt.title("Most frequent Routes")
grid(True)
plt.show()



## Use Case 3:
### Top 10 drivers 

In [None]:
taxi_TopDrivers = taxiDone.map(lambda x:(x[0],1)).reduceByKey(lambda v1,v2:v1+v2).sortBy(lambda x: x[1],ascending=False).cache()

In [None]:
taxi_TopDriversDF = pd.DataFrame(taxi_TopDrivers.take(10)).rename(columns = {0:'driver', 1:'count'})

In [None]:
taxi_TopDriversDF

In [None]:
plt.figure()
y = taxi_TopDriversDF['count']

x = range(1,11)
plt.xticks(x,x)
width = 5/10
plt.bar(x, y, width, color="blue")

plt.show()

## Use Case 4:
### Average trips number per session

In [None]:
session_avg = sessions.map(lambda x:(x[0],len(x[1]))).toDF(["License","Count"])
trips_avg.show(5)

In [None]:
average_number = session_avg.agg({'Count':'mean'}).withColumnRenamed("avg(Count)","Mean trips/session")

average_number.show()