## In SageMaker, you might need to configure to load the mongo-spark-connector package and kernel.

In [1]:
# !pip install s2sphere

In [2]:
# !pip install mapsplotlib

In [1]:
!which python

/mnt/anaconda2/bin/python


In [2]:
import os
pyspark_submit_args = '--packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0 pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler

In [4]:
spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.uri", "mongodb://54.212.226.251:27017/msds697proj.nyc_taxi")\
    .getOrCreate()

In [49]:
schema = StructType([StructField("End_Lat",DoubleType(),True),
 StructField("End_Lon",DoubleType(),True),
 StructField("Fare_Amt",DoubleType(),True),
 StructField("Passenger_Count",IntegerType(),True),
 StructField("Payment_Type",StringType(),True),
 StructField("Rate_Code",StringType(),True),
 StructField("Start_Lat",DoubleType(),True),
 StructField("Start_Lon",DoubleType(),True),
 StructField("Tip_Amt",DoubleType(),True),
 StructField("Tolls_Amt",DoubleType(),True),
 StructField("Total_Amt",DoubleType(),True),
 StructField("Trip_Distance",DoubleType(),True),
 StructField("Trip_Dropoff_DateTime",StringType(),True),
 StructField("Trip_Pickup_DateTime",StringType(),True),
 StructField("_id",StructType([StructField("oid",StringType(),True)]),True),
 StructField("mta_tax",StringType(),True),
 StructField("store_and_forward",StringType(),True),
 StructField("surcharge",DoubleType(),True),
 StructField("vendor_name",StringType(),True)])

In [50]:
nyc_df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load(schema=schema)

In [51]:
nyc_df.printSchema()

root
 |-- End_Lat: double (nullable = true)
 |-- End_Lon: double (nullable = true)
 |-- Fare_Amt: double (nullable = true)
 |-- Passenger_Count: integer (nullable = true)
 |-- Payment_Type: string (nullable = true)
 |-- Rate_Code: string (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Start_Lon: double (nullable = true)
 |-- Tip_Amt: double (nullable = true)
 |-- Tolls_Amt: double (nullable = true)
 |-- Total_Amt: double (nullable = true)
 |-- Trip_Distance: double (nullable = true)
 |-- Trip_Dropoff_DateTime: string (nullable = true)
 |-- Trip_Pickup_DateTime: string (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- store_and_forward: string (nullable = true)
 |-- surcharge: double (nullable = true)
 |-- vendor_name: string (nullable = true)



In [52]:
def lowercase_cols(df):
    for col in df.columns:
        new_col = col.lower()
        if new_col != col:
            df = df.withColumnRenamed(col, new_col)
    return df

def toDatetime(df, col_name):
    df = df.withColumn(col_name + "_2", to_timestamp(nyc_df[col_name], 'yyyy-MM-dd HH:mm:ss'))
    df = df.drop(col_name).withColumnRenamed(col_name + "_2", col_name)
    return df

nyc_df = lowercase_cols(nyc_df)
nyc_df = toDatetime(nyc_df, "trip_pickup_datetime")
nyc_df = toDatetime(nyc_df, "trip_dropoff_datetime")

Extract day of week, add "is_weekend" flag when trip starts OR ends during the weekend

In [53]:
nyc_df = nyc_df.withColumn('dow_dropoff', date_format('trip_dropoff_datetime', 'u').cast(IntegerType()))\
.withColumn('strdow_dropoff', date_format('trip_dropoff_datetime', 'E'))\
.withColumn('dow_pickup', date_format('trip_pickup_datetime', 'u').cast(IntegerType()))\
.withColumn('strdow_pickup', date_format('trip_pickup_datetime', 'E'))\
.withColumn('hour_pickup', date_format('trip_pickup_datetime', 'H').cast(IntegerType()))\
.withColumn('hour_dropoff', date_format('trip_dropoff_datetime', 'H').cast(IntegerType()))

nyc_df = nyc_df.withColumn("is_weekend", when((nyc_df.dow_pickup >= 6) | (nyc_df.dow_dropoff >= 6), 1).otherwise(0))

### Get cells

In [54]:
cell_level = 13

def coord_to_id_fun(x, cell_level=cell_level):
    lat, lon = x
    from s2sphere import CellId, LatLng
    cell_id = CellId.from_lat_lng(LatLng.from_degrees(lat, lon))\
    .parent(cell_level).to_token()
    return cell_id

def get_corners(s2CellId_str, cell_level=cell_level+1, cluster=1):
    from s2sphere import CellId, LatLng, Cell
    c1 = Cell(CellId(int(s2CellId_str,16)<<(60 - 2*cell_level)))
    v0 = LatLng.from_point(c1.get_vertex(0)) # lat/lon of upper/left corner
    v1 = LatLng.from_point(c1.get_vertex(1)) # lat/lon of lower/left corner
    v2 = LatLng.from_point(c1.get_vertex(2)) # lat/lon of lower/right corner
    v3 = LatLng.from_point(c1.get_vertex(3)) # lat/lon of upper/right corner
    return ((v0.lat().degrees, v0.lng().degrees, cluster),
            (v1.lat().degrees, v1.lng().degrees, cluster),
            (v2.lat().degrees, v2.lng().degrees, cluster),
            (v3.lat().degrees, v3.lng().degrees, cluster))

def distance_fun(x):
    import math
    lat1, lon1, lat2, lon2 = x
    radius = 6371 # km

    dlat = math.radians(lat2-lat1)
    dlon = math.radians(lon2-lon1)
    a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) \
        * math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2)
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
    d = radius * c
    return d

coord_to_id = udf(lambda x: coord_to_id_fun(x), StringType())
distance = udf(lambda x: distance_fun(x), FloatType())

In [55]:
nyc_df = nyc_df.withColumn("start_cell_id", coord_to_id(array("start_lat", "start_lon")))
nyc_df = nyc_df.withColumn("end_cell_id", coord_to_id(array("end_lat", "end_lon")))
nyc_df = nyc_df.withColumn("distance_km", distance(array("start_lat", "start_lon","end_lat", "end_lon")))
nyc_df.cache()

DataFrame[end_lat: double, end_lon: double, fare_amt: double, passenger_count: int, payment_type: string, rate_code: string, start_lat: double, start_lon: double, tip_amt: double, tolls_amt: double, total_amt: double, trip_distance: double, _id: struct<oid:string>, mta_tax: string, store_and_forward: string, surcharge: double, vendor_name: string, trip_pickup_datetime: timestamp, trip_dropoff_datetime: timestamp, dow_dropoff: int, strdow_dropoff: string, dow_pickup: int, strdow_pickup: string, hour_pickup: int, hour_dropoff: int, is_weekend: int, start_cell_id: string, end_cell_id: string, distance_km: float]

In [None]:
%%time
nyc_df.count()

### Common outbound cells

In [None]:
popular_cell_id = nyc_df.groupBy("end_cell_id").count().orderBy("count", ascending=False).first().asDict()["end_cell_id"]
unpopular_cell_id = nyc_df.groupBy("end_cell_id").count().orderBy("count", ascending=True).first().asDict()["end_cell_id"]

### Gather features by cell

In [None]:
def pivot_column(df, group_col, pv_col, prefix):
    orig_cols = set(df.columns)
    #nyc_df = nyc_df.groupBy(["start_cell_id", "dow_pickup"]).count()#.filter(nyc_df.start_cell_id == "89c243dc").show(5)
    new_df = df.groupBy(group_col).pivot(pv_col).count()
    new_cols = new_df.columns
    new_cols = [c if c in orig_cols else prefix + c for c in new_cols]
    new_df = new_df.toDF(*new_cols)
    new_df = new_df.na.fill(0, list(set(new_cols) - orig_cols))
    return new_df

In [None]:
nyc_dow_pickup_df = pivot_column(nyc_df, group_col = "start_cell_id", pv_col = "dow_pickup", prefix = "dow_pickup_")

In [None]:
nyc_dow_pickup_df.show(5)

In [None]:
nyc_dow_dropoff_df = pivot_column(nyc_df, group_col = "end_cell_id", pv_col = "dow_dropoff", prefix = "dow_dropoff_")

In [None]:
def merge_dfs(dfs):
    first_df = None
    for df in dfs:
        if first_df is None:
            first_df = df
        else:
            first_df = first_df.join(df, "cell_id", "left_outer")
    return first_df

In [None]:
nyc_hr_pickup_wkday_df = pivot_column(nyc_df.filter(nyc_df.dow_pickup<6), 
                                                           group_col = "start_cell_id", 
                                                           pv_col = "hour_pickup", 
                                                           prefix = "hr_pickup_wkday")

nyc_hr_dropoff_wkday_df = pivot_column(nyc_df.filter(nyc_df.dow_dropoff<6), 
                                                           group_col = "end_cell_id", 
                                                           pv_col = "hour_dropoff", 
                                                           prefix = "hr_dropoff_wkday")

nyc_hr_pickup_wkend_df = pivot_column(nyc_df.filter(nyc_df.dow_pickup>=6), 
                                                           group_col = "start_cell_id", 
                                                           pv_col = "hour_pickup", 
                                                           prefix = "hr_pickup_wkend")

nyc_hr_dropoff_wkend_df = pivot_column(nyc_df.filter(nyc_df.dow_dropoff>=6), 
                                                           group_col = "end_cell_id", 
                                                           pv_col = "hour_dropoff", 
                                                           prefix = "hr_dropoff_wkend")

In [None]:
nyc_pickup_km_df = nyc_df.groupBy("start_cell_id").agg(avg("distance_km"),stddev("distance_km"))\
                    .withColumnRenamed("avg(distance_km)", "avg_pickup_km")\
                    .withColumnRenamed("stddev_samp(distance_km)", "std_pickup_km")\
                    .withColumnRenamed("start_cell_id", "cell_id")
    
nyc_dropoff_km_df = nyc_df.groupBy("end_cell_id").agg(avg("distance_km"),stddev("distance_km"))\
                    .withColumnRenamed("avg(distance_km)", "avg_dropoff_km")\
                    .withColumnRenamed("stddev_samp(distance_km)", "std_dropoff_km")\
                    .withColumnRenamed("end_cell_id", "cell_id")

In [None]:
nyc_cell_df = merge_dfs([nyc_dow_pickup_df, nyc_dow_dropoff_df, 
          nyc_hr_pickup_wkday_df, nyc_hr_dropoff_wkday_df,
          nyc_hr_pickup_wkend_df, nyc_hr_dropoff_wkend_df,
          nyc_pickup_km_df, nyc_dropoff_km_df
         ])

nyc_cell_df = nyc_cell_df.na.fill(0)

# nyc_cell_df.orderBy("hr_dropoff_wkend12", ascending=True).show(5)

In [None]:
nyc_cell_df.printSchema()

In [None]:
feature_columns = [c for c in nyc_cell_df.columns if c != "cell_id"]

for c in feature_columns:
    nyc_cell_df = nyc_cell_df.withColumn(c+"_f", nyc_cell_df[c].cast(FloatType()))
    nyc_cell_df = nyc_cell_df.drop(c)
    nyc_cell_df = nyc_cell_df.withColumnRenamed(c+"_f",c)

### Vectorize features

In [None]:
#feature_columns = ["distance_km", "dow_pickup", "dow_dropoff", "hour_pickup", "hour_dropoff", "is_weekend"]
va = VectorAssembler(outputCol="features", inputCols=feature_columns)
vectorized_df = va.transform(nyc_cell_df)
vectorized_df = vectorized_df.select("features")
vectorized_df.cache()

from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features",\
         outputCol="scaledFeatures")
scalerModel =  scaler.fit(vectorized_df)
vectorized_df = scalerModel.transform(vectorized_df)

vectorized_df.show(1)

In [None]:
vectorized_df.head()

## Plotting

In [None]:
%matplotlib inline

import pandas as pd
import numpy as np
from mapsplotlib import mapsplot as mplt
import constants as c
mplt.register_api_key(c.google_maps_key)

def show_cell(cell_id, cell_level=cell_level+1):
    map_data = get_corners(cell_id, cell_level=14)
    map_df = pd.DataFrame(np.array(map_data), columns=["latitude", "longitude", "cluster"])
    map_df.cluster = map_df.cluster.astype(np.int)
    mplt.polygons(map_df['latitude'], map_df['longitude'], map_df['cluster'])

In [None]:
show_cell('89c258fc')

# SF Data


In [None]:
spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.uri", "mongodb://54.212.226.251:27017/msds697proj.sf_taxi")\
    .getOrCreate()

In [None]:
sf_df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()

In [None]:
sf_df.schema

In [None]:
sf_df.printSchema()