In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.enableHiveSupport().appName('Rideshare').getOrCreate()
sc = spark.sparkContext

In [2]:
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '4g'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '4'), ('spark.cores.max', '4'), ('spark.driver.memory','4g')])

In [3]:
spark.sparkContext.getConf().getAll()

[('spark.eventLog.enabled', 'true'),
 ('spark.yarn.jars',
  'local:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/jars/*,local:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/hive/*'),
 ('spark.yarn.appMasterEnv.MKL_NUM_THREADS', '1'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES',
  'http://md01.rcc.local:8088/proxy/application_1577383759214_11109,http://md02.rcc.local:8088/proxy/application_1577383759214_11109'),
 ('spark.sql.queryExecutionListeners',
  'com.cloudera.spark.lineage.NavigatorQueryListener'),
 ('spark.lineage.log.dir', '/var/log/spark/lineage'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS',
  'md01.rcc.local,md02.rcc.local'),
 ('spark.serializer', 'org.apache.spark.serializer.KryoSerializer'),
 ('spark.executorEnv.PYTHONPATH',
  '/opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.10.7-src.zip:/opt/cloudera/parcels/CDH/lib/spark/python/lib/pyspark.zi

In [4]:
rideshares = spark.read.csv("/user/trasley/data/rideshare.csv", inferSchema=True, header=True)

In [5]:
rideshares.count()

158617578

In [6]:
rideshares.printSchema()

root
 |-- Trip ID: string (nullable = true)
 |-- Trip Start Timestamp: string (nullable = true)
 |-- Trip End Timestamp: string (nullable = true)
 |-- Trip Seconds: integer (nullable = true)
 |-- Trip Miles: double (nullable = true)
 |-- Pickup Census Tract: long (nullable = true)
 |-- Dropoff Census Tract: long (nullable = true)
 |-- Pickup Community Area: integer (nullable = true)
 |-- Dropoff Community Area: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Tip: integer (nullable = true)
 |-- Additional Charges: double (nullable = true)
 |-- Trip Total: double (nullable = true)
 |-- Shared Trip Authorized: boolean (nullable = true)
 |-- Trips Pooled: integer (nullable = true)
 |-- Pickup Centroid Latitude: double (nullable = true)
 |-- Pickup Centroid Longitude: double (nullable = true)
 |-- Pickup Centroid Location: string (nullable = true)
 |-- Dropoff Centroid Latitude: double (nullable = true)
 |-- Dropoff Centroid Longitude: double (nullable = true)
 |-- Dropof

In [7]:
rideshares = rideshares.withColumnRenamed("Trip ID", "trip_id")\
    .withColumnRenamed("Trip Start Timestamp","start_time")\
    .withColumnRenamed("Trip End Timestamp","end_time")\
    .withColumnRenamed("Trip Seconds","seconds")\
    .withColumnRenamed("Trip Miles","miles")\
    .withColumnRenamed("Pickup Census Tract","pickup_tract")\
    .withColumnRenamed("Dropoff Census Tract","dropoff_tract")\
    .withColumnRenamed("Pickup Community Area","pickup_comm_area")\
    .withColumnRenamed("Dropoff Community Area","dropoff_comm_area")\
    .withColumnRenamed("Fare","fare")\
    .withColumnRenamed("Tip","tip")\
    .withColumnRenamed("Additional Charges","add_charges")\
    .withColumnRenamed("Trip Total","total")\
    .withColumnRenamed("Shared Trip Authorized","shared_auth")\
    .withColumnRenamed("Trips Pooled","trips_pooled")\
    .withColumnRenamed("Pickup Centroid Latitude","pickup_lat_centroid")\
    .withColumnRenamed("Pickup Centroid Longitude","pickup_long_centroid")\
    .withColumnRenamed("Pickup Centroid Location","pickup_loc_centroid")\
    .withColumnRenamed("Dropoff Centroid Latitude","dropoff_lat_centroid")\
    .withColumnRenamed("Dropoff Centroid Longitude","dropoff_long_centroid")\
    .withColumnRenamed("Dropoff Centroid Location","dropoff_loc_centroid")
rideshares.show(10)

+--------------------+--------------------+--------------------+-------+-----+------------+-------------+----------------+-----------------+----+---+-----------+-----+-----------+------------+-------------------+--------------------+--------------------+--------------------+---------------------+--------------------+
|             trip_id|          start_time|            end_time|seconds|miles|pickup_tract|dropoff_tract|pickup_comm_area|dropoff_comm_area|fare|tip|add_charges|total|shared_auth|trips_pooled|pickup_lat_centroid|pickup_long_centroid| pickup_loc_centroid|dropoff_lat_centroid|dropoff_long_centroid|dropoff_loc_centroid|
+--------------------+--------------------+--------------------+-------+-----+------------+-------------+----------------+-----------------+----+---+-----------+-----+-----------+------------+-------------------+--------------------+--------------------+--------------------+---------------------+--------------------+
|6e22fde8d26ed1363...|05/27/2019 11:30:...|

### Data Cleanup

In [8]:
rideshares.select("start_time").show(10,False)

+----------------------+
|start_time            |
+----------------------+
|05/27/2019 11:30:00 PM|
|06/11/2019 10:45:00 PM|
|04/28/2019 11:15:00 AM|
|04/18/2019 02:30:00 PM|
|06/14/2019 06:30:00 PM|
|04/27/2019 05:45:00 PM|
|04/11/2019 07:15:00 PM|
|06/06/2019 04:00:00 PM|
|04/18/2019 09:00:00 AM|
|06/13/2019 05:00:00 PM|
+----------------------+
only showing top 10 rows



In [9]:
from pyspark.sql.types import StringType,IntegerType,DateType
from pyspark.sql.functions import to_timestamp

rideshares = rideshares.withColumn('start_time',to_timestamp('start_time','MM/dd/yyyy HH:mm:ss')).withColumn('end_time',to_timestamp('end_time','MM/dd/yyyy HH:mm:ss'))
rideshares.select('start_time').show(10,False)

+-------------------+
|start_time         |
+-------------------+
|2019-05-27 11:30:00|
|2019-06-11 10:45:00|
|2019-04-28 11:15:00|
|2019-04-18 02:30:00|
|2019-06-14 06:30:00|
|2019-04-27 05:45:00|
|2019-04-11 07:15:00|
|2019-06-06 04:00:00|
|2019-04-18 09:00:00|
|2019-06-13 05:00:00|
+-------------------+
only showing top 10 rows



In [10]:
#Limit date range for consistency across datasets
date_from='2018-11-01'
date_to='2020-06-30'
rideshares=rideshares.filter((rideshares.start_time>=date_from) & (rideshares.start_time<=date_to))
rideshares.count()

158503181

In [11]:
from pyspark.sql.functions import when, count, col

#Show fields with null values
rideshares.select([count(when(col(i).isNull(), i)).alias(i) for i in rideshares.columns]).show()

+-------+----------+--------+-------+-----+------------+-------------+----------------+-----------------+----+---+-----------+-----+-----------+------------+-------------------+--------------------+-------------------+--------------------+---------------------+--------------------+
|trip_id|start_time|end_time|seconds|miles|pickup_tract|dropoff_tract|pickup_comm_area|dropoff_comm_area|fare|tip|add_charges|total|shared_auth|trips_pooled|pickup_lat_centroid|pickup_long_centroid|pickup_loc_centroid|dropoff_lat_centroid|dropoff_long_centroid|dropoff_loc_centroid|
+-------+----------+--------+-------+-----+------------+-------------+----------------+-----------------+----+---+-----------+-----+-----------+------------+-------------------+--------------------+-------------------+--------------------+---------------------+--------------------+
|      0|         0|    8686| 214005| 5036|    48698479|     49360897|        10159824|         11389965| 142| 34|        131|  142|          0|       

In [12]:
#Drop rows where dropoff/pickup centroids are null - need them to accurately capture ride
rideshares=rideshares.dropna(subset=['pickup_lat_centroid', 'pickup_long_centroid', 'dropoff_lat_centroid', 'dropoff_long_centroid'])

In [13]:
rideshares.rdd.countApprox(timeout=5)

137237155

In [14]:
##rows that have multiple null values on start/end/length of time - can't calculate other values
rideshares.where((col('start_time').isNull() & (col('end_time').isNull() | col('seconds').isNull()))).count()

0

In [15]:
rideshares.where(col('seconds').isNull()).count()

182000

In [16]:
from pyspark.sql.functions import hour,minute,second

rideshares.where(col('seconds').isNull()).withColumn("hours",hour("end_time")-hour("start_time")).select("seconds","miles","hours").show()

+-------+-----+-----+
|seconds|miles|hours|
+-------+-----+-----+
|   null|  3.4|    5|
|   null|  1.1|    4|
|   null|  1.7|    9|
|   null|  2.2|    8|
|   null|  2.1|    7|
|   null|  5.4|    1|
|   null|  3.8|    5|
|   null|  1.8|    1|
|   null|  2.1|   10|
|   null|  3.4|    0|
|   null| 13.0|    8|
|   null|  1.5|    5|
|   null|  2.9|    2|
|   null|  9.5|    6|
|   null|  4.5|    0|
|   null|  1.7|    4|
|   null| 10.5|    8|
|   null|  1.8|    4|
|   null|  3.3|    6|
|   null|  4.4|    9|
+-------+-----+-----+
only showing top 20 rows



Seems like lots of cases where the ride was many hours yet only went a couple of miles - remove these records

In [17]:
#rideshares.filter(col('seconds').isNull() & (hour(col("end_time"))-hour(col("start_time"))>=1)).count()
rideshares=rideshares.filter(~(col('seconds').isNull() & (hour(col("end_time"))-hour(col("start_time"))>=1)))
rideshares.count()

137072012

In [18]:
rideshares.select([count(when(col(i).isNull(), i)).alias(i) for i in rideshares.columns]).show()

+-------+----------+--------+-------+-----+------------+-------------+----------------+-----------------+----+---+-----------+-----+-----------+------------+-------------------+--------------------+-------------------+--------------------+---------------------+--------------------+
|trip_id|start_time|end_time|seconds|miles|pickup_tract|dropoff_tract|pickup_comm_area|dropoff_comm_area|fare|tip|add_charges|total|shared_auth|trips_pooled|pickup_lat_centroid|pickup_long_centroid|pickup_loc_centroid|dropoff_lat_centroid|dropoff_long_centroid|dropoff_loc_centroid|
+-------+----------+--------+-------+-----+------------+-------------+----------------+-----------------+----+---+-----------+-----+-----------+------------+-------------------+--------------------+-------------------+--------------------+---------------------+--------------------+
|      0|         0|    6922|  16856| 4494|    34361760|     34361760|           99940|           104893| 109| 25|         99|  109|          0|       

In [19]:
#https://medium.com/@nikolasbielski/using-a-custom-udf-in-pyspark-to-compute-haversine-distances-d877b77b4b18
#Compare miles feature to actual Euclidean distance
from pyspark.sql import functions as F

from math import radians, cos, sin, asin, sqrt

def get_distance(latit_a, longit_a, latit_b, longit_b):
    # Transform to radians
    longit_a, latit_a, longit_b, latit_b = map(radians, [longit_a,  latit_a, longit_b, latit_b])
    dist_longit = longit_b - longit_a
    dist_latit = latit_b - latit_a
    # Calculate area
    area = sin(dist_latit/2)**2 + cos(latit_a) * cos(latit_b) * sin(dist_longit/2)**2
    # Calculate the central angle
    central_angle = 2 * asin(sqrt(area))
    radius = 6371
    # Calculate Distance
    distance = central_angle * radius
    return abs(round(distance, 2))

udf_get_distance = F.udf(get_distance)

In [20]:
rideshares=rideshares.withColumn("distance", udf_get_distance(rideshares.pickup_lat_centroid, rideshares.pickup_long_centroid, \
                        rideshares.dropoff_lat_centroid, rideshares.dropoff_long_centroid))

In [21]:
rideshares.select([count(when(col(i).isNull(), i)).alias(i) for i in rideshares.columns]).show()

+-------+----------+--------+-------+-----+------------+-------------+----------------+-----------------+----+---+-----------+-----+-----------+------------+-------------------+--------------------+-------------------+--------------------+---------------------+--------------------+--------+
|trip_id|start_time|end_time|seconds|miles|pickup_tract|dropoff_tract|pickup_comm_area|dropoff_comm_area|fare|tip|add_charges|total|shared_auth|trips_pooled|pickup_lat_centroid|pickup_long_centroid|pickup_loc_centroid|dropoff_lat_centroid|dropoff_long_centroid|dropoff_loc_centroid|distance|
+-------+----------+--------+-------+-----+------------+-------------+----------------+-----------------+----+---+-----------+-----+-----------+------------+-------------------+--------------------+-------------------+--------------------+---------------------+--------------------+--------+
|      0|         0|    6922|  16856| 4494|    34361760|     34361760|           99940|           104893| 109| 25|         9

In [22]:
rideshares=rideshares.dropna(subset=['seconds','end_time','miles','fare'])
rideshares=rideshares.drop(*['pickup_tract','dropoff_tract'])


In [23]:
rideshares.select([count(when(col(i).isNull(), i)).alias(i) for i in rideshares.columns]).show()

+-------+----------+--------+-------+-----+----------------+-----------------+----+---+-----------+-----+-----------+------------+-------------------+--------------------+-------------------+--------------------+---------------------+--------------------+--------+
|trip_id|start_time|end_time|seconds|miles|pickup_comm_area|dropoff_comm_area|fare|tip|add_charges|total|shared_auth|trips_pooled|pickup_lat_centroid|pickup_long_centroid|pickup_loc_centroid|dropoff_lat_centroid|dropoff_long_centroid|dropoff_loc_centroid|distance|
+-------+----------+--------+-------+-----+----------------+-----------------+----+---+-----------+-----+-----------+------------+-------------------+--------------------+-------------------+--------------------+---------------------+--------------------+--------+
|      0|         0|       0|      0|    0|           99914|           104872|   0|  0|          0|    0|          0|           0|                  0|                   0|                  0|              

In [24]:
rideshares.where(col('end_time').isNull() & col('seconds').isNull()).count()

0

In [25]:
communities = spark.read.csv("/user/trasley/data/chicago_communities.csv", inferSchema=True, header=False)

In [26]:
spark.sql('use trasley').show()

++
||
++
++



### Data Exploration

In [28]:
#tracts=rideshares.crosstab('pickup_tract','dropoff_tract').toPandas()
#tracts.head(10)

In [None]:
types = df.select("TYPE").distinct().rdd.flatMap(lambda x: x).collect()
codes = df.select("CODE").distinct().rdd.flatMap(lambda x: x).collect()
types_expr = [F.when(F.col("TYPE") == ty, 1).otherwise(0).alias("e_TYPE_" + ty) for ty in types]
codes_expr = [F.when(F.col("CODE") == code, 1).otherwise(0).alias("e_CODE_" + code) for code in codes]
df = df.select("ID", "TYPE", "CODE", *types_expr+codes_expr)
df.show()

In [None]:
from pyspark.ml.feature import VectorAssembler

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator

#convert relevant categorical into one hot encoded
indexer1 = StringIndexer(inputCol="pickup_comm_area", outputCol="countryIdx").setHandleInvalid("skip")
indexer2 = StringIndexer(inputCol="dropoff_comm_area", outputCol="provinceIdx").setHandleInvalid("skip")
indexer3 = StringIndexer(inputCol="variety", outputCol="varietyIdx").setHandleInvalid("skip")
indexer4 = StringIndexer(inputCol="winery", outputCol="wineryIdx").setHandleInvalid("skip")

#gather all indexers as inputs to the One Hot Encoder
inputs = [indexer1.getOutputCol(), indexer2.getOutputCol(), \
          indexer3.getOutputCol(), indexer4.getOutputCol()]

#create the one hot encoder
encoder = OneHotEncoderEstimator(inputCols=inputs,  \
                                 outputCols=["countryVec", "provinceVec", \
                                             "varietyVec", "wineryVec"])

#run it through a pipeline
pipeline = Pipeline(stages=[indexer1, indexer2, indexer3, indexer4, encoder])
encodedData = pipeline.fit(df).transform(df)

#we have removed NAs so dont need to impute missing values.
#pipeline = pipeline.na.fill(0) 

encodedData.show(5)

In [None]:
from pyspark.sql.functions import year, month, dayofmonth

#Show monthly rides over time
monthlyrides=rideshares.withColumn("year",year("start_time").cast(StringType())).withColumn("month",month("start_time").cast(StringType())).groupby("year","month").count().sort(['year','month'],ascending=False).toPandas()
monthlyrides.plot(y="count",figsize=(15,4), style="-")

In [None]:
rideshares.select("pickup_comm_area").distinct().show(10)

In [None]:
communities=communities.withColumnRenamed("_c0","code").withColumnRenamed("_c1","community_name")

In [None]:
communities.show(10)

In [None]:
rideshares = rideshares.join(communities, rideshares.pickup_comm_area == communities.code, how='left_outer')
rideshares.show()

In [None]:
### Drop blank community value codes
rideshares=rideshares.dropna(subset=['code'])

In [None]:
poolAuth=rideshares.select("shared_auth").groupby("shared_auth").count().toPandas()

In [None]:
poolAuth.head()

In [None]:
poolAuth["shared_auth"]

In [None]:
import numpy as np
import matplotlib.pyplot as plt

x = np.arange(2)
width = 0.35 


fig, ax = plt.subplots()
ax.bar(x + width/2, poolAuth["count"], width)

ax.set_title('Individual vs Pooled Rides')
ax.set_xticks(x)
ax.set_xticklabels(poolAuth["shared_auth"])

plt.show()

In [None]:
pooled_riders=rideshares.where(rideshares.shared_auth==True).select("trips_pooled").groupby("trips_pooled").count().sort('count',ascending=False).toPandas()

In [None]:
rideshares.where(rideshares.trips_pooled==4097).show()

In [None]:
community_activity=rideshares.where(rideshares.dropoff_comm_area==22).withColumn("year",year("start_time")).withColumn("month",month("start_time")).groupby("year","month").count().sort(['year','month'],ascending=False).toPandas()
community_activity.plot(y="count",figsize=(15,4), style="-")

distribution of:
ride length
cost
