# Data Preprocessing

In [23]:
# import context manager: SparkSession
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder \
        .master("local[*]") \
        .appName("mllib_classifier") \
        .config("spark.executor.memory", '20g') \
        .config('spark.executor.cores', '2') \
        .config('spark.executor.instances', '3') \
        .config("spark.driver.memory",'1g') \
        .getOrCreate()

sc = spark.sparkContext

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

## Benchmark DF Approach

In [24]:
data_file = spark.read.csv("/../../project/ds5559/Alice_Ed_Michael_Sam_project/BigTrips.csv", header=True)

In [25]:
data_file.printSchema()

root
 |-- Trip ID: string (nullable = true)
 |-- Trip Start Timestamp: string (nullable = true)
 |-- Trip End Timestamp: string (nullable = true)
 |-- Trip Seconds: string (nullable = true)
 |-- Trip Miles: string (nullable = true)
 |-- Pickup Census Tract: string (nullable = true)
 |-- Dropoff Census Tract: string (nullable = true)
 |-- Pickup Community Area: string (nullable = true)
 |-- Dropoff Community Area: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Tip: string (nullable = true)
 |-- Additional Charges: string (nullable = true)
 |-- Trip Total: string (nullable = true)
 |-- Shared Trip Authorized: string (nullable = true)
 |-- Trips Pooled: string (nullable = true)
 |-- Pickup Centroid Latitude: string (nullable = true)
 |-- Pickup Centroid Longitude: string (nullable = true)
 |-- Pickup Centroid Location: string (nullable = true)
 |-- Dropoff Centroid Latitude: string (nullable = true)
 |-- Dropoff Centroid Longitude: string (nullable = true)
 |-- Dropoff 

Raw data file is ~12 gigabytes. Take a 25% sample, stratified by sample, to get a 3 gigabyte file. Commented out line is for testing smaller subsamples.

In [26]:
#final_DF = data_file.sample(0.25).sample(0.0001)
final_DF = data_file
del(data_file)
final_DF.cache()

DataFrame[Trip ID: string, Trip Start Timestamp: string, Trip End Timestamp: string, Trip Seconds: string, Trip Miles: string, Pickup Census Tract: string, Dropoff Census Tract: string, Pickup Community Area: string, Dropoff Community Area: string, Fare: string, Tip: string, Additional Charges: string, Trip Total: string, Shared Trip Authorized: string, Trips Pooled: string, Pickup Centroid Latitude: string, Pickup Centroid Longitude: string, Pickup Centroid Location: string, Dropoff Centroid Latitude: string, Dropoff Centroid Longitude: string, Dropoff Centroid Location: string]

In [27]:
#https://stackoverflow.com/questions/53304688/spark-date-format-mmm-dd-yyyy-hhmmss-am-to-timestamp-in-df
#https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
#start_times = final_DF.select("Trip ID","Trip Start Timestamp")
#st = start_times.withColumn("Trip_Start_Timestamp",F.to_timestamp(F.col("Trip Start Timestamp"), "MM/dd/yyyy hh:mm:ss a"))

#end_times = final_DF.select("Trip ID","Trip End Timestamp")
#et = end_times.withColumn("Trip_End_Timestamp",F.to_timestamp(F.col("Trip End Timestamp"), "MM/dd/yyyy hh:mm:ss a"))
final_DF = final_DF.withColumn("Trip_Start_Timestamp",F.to_timestamp(F.col("Trip Start Timestamp"), "MM/dd/yyyy hh:mm:ss a"))
final_DF = final_DF.withColumn("Trip_End_Timestamp",F.to_timestamp(F.col("Trip End Timestamp"), "MM/dd/yyyy hh:mm:ss a"))
#https://stackoverflow.com/questions/29600673/how-to-delete-columns-in-pyspark-dataframe
final_DF = final_DF.drop(*['Trip Start Timestamp','Trip End Timestamp'])

In [28]:
final_DF.createOrReplaceTempView("dataset")

In [29]:
final_DF = spark.sql("""SELECT *, (CASE WHEN Trip_Start_Timestamp >= CAST('2020-03-11' AS DATE)
                                                        THEN 1
                                                        ELSE 0
                                                END) as PostShutdownFlag
                                     FROM dataset
                                    WHERE Trip_Start_Timestamp BETWEEN CAST('2020-02-05' AS DATE) AND CAST('2020-04-15' AS DATE)""")

In [30]:
#final_DF = final_DF.join(st.select("Trip ID","Trip_Start_Timestamp"),on="Trip ID").join(et.select("Trip ID","Trip_End_Timestamp"),on="Trip ID")
#https://stackoverflow.com/questions/49397966/in-pyspark-how-do-you-add-concat-a-string-to-a-column
final_DF = final_DF.withColumn("Day_Month_str", F.concat(F.dayofmonth(F.col("Trip_Start_Timestamp")),F.lit("-"),F.month(F.col("Trip_Start_Timestamp"))).cast("string"))

In [31]:
final_DF = final_DF.withColumn('Trip Seconds',F.col('Trip Seconds').cast("integer"))
final_DF = final_DF.withColumn('Trip Miles',F.col('Trip Miles').cast("double"))
final_DF = final_DF.withColumn('Pickup Community Area',F.col('Pickup Community Area').cast("integer"))
final_DF = final_DF.withColumn('Dropoff Community Area',F.col('Dropoff Community Area').cast("integer"))
final_DF = final_DF.withColumn('Fare',F.col('Fare').cast("double"))
final_DF = final_DF.withColumn('Tip',F.col('Tip').cast("double"))
final_DF = final_DF.withColumn('Additional Charges',F.col('Additional Charges').cast("double"))
final_DF = final_DF.withColumn('Trip Total',F.col('Trip Total').cast("double"))
final_DF = final_DF.withColumn('Shared Trip Authorized',F.col('Shared Trip Authorized').cast("boolean"))
final_DF = final_DF.withColumn('Trips Pooled',F.col('Trips Pooled').cast("integer"))

In [32]:
final_DF = final_DF.withColumn("label", F.when(F.col("Tip") > 0,1).otherwise(0))

In [33]:
#https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#data-types
final_DF = final_DF.withColumn("Trip_Year", F.year(F.col("Trip_Start_Timestamp"))) \
                    .withColumn("Trip_Month", F.month(F.col("Trip_Start_Timestamp"))) \
                    .withColumn("Trip_WeekNumber", F.weekofyear(F.col("Trip_Start_Timestamp"))) \
                    .withColumn("Trip_DayofWeek", F.dayofweek(F.col("Trip_Start_Timestamp"))) \
                    .withColumn("Trip_Start_Hour", F.hour(F.col("Trip_Start_Timestamp"))) \
                    .withColumn("Trip_Start_Minute", F.minute(F.col("Trip_Start_Timestamp"))) \
                    .withColumn("Trip_End_Hour", F.hour(F.col("Trip_End_Timestamp"))) \
                    .withColumn("Date", F.to_date(F.col("Trip_Start_Timestamp")))

In [34]:
final_DF = final_DF.withColumnRenamed("Trip ID","Trip_ID") \
                    .withColumnRenamed("Trip Seconds","Trip_Seconds") \
                    .withColumnRenamed("Trip Miles","Trip_Miles") \
                    .withColumnRenamed("Pickup Census Tract","Pickup_Census_Tract") \
                    .withColumnRenamed("Dropoff Census Tract","Dropoff_Census_Tract") \
                    .withColumnRenamed("Pickup Community Area","Pickup_Community_Area") \
                    .withColumnRenamed("Dropoff Community Area","Dropoff_Community_Area") \
                    .withColumnRenamed("Additional Charges","Additional_Charges_str") \
                    .withColumnRenamed("Trip Total","Trip_Total") \
                    .withColumnRenamed("Shared Trip Authorized","Shared_Trip_Authorized") \
                    .withColumnRenamed("Trips Pooled","Trips_Pooled") \
                    .withColumnRenamed("Pickup Centroid Latitude","Pickup_Centroid_Latitude") \
                    .withColumnRenamed("Pickup Centroid Longitude","Pickup_Centroid_Longitude") \
                    .withColumnRenamed("Pickup Centroid Location","Pickup_Centroid_Location") \
                    .withColumnRenamed("Dropoff Centroid Latitude","Dropoff_Centroid_Latitude") \
                    .withColumnRenamed("Dropoff Centroid Longitude","Dropoff_Centroid_Longitude") \
                    .withColumnRenamed("Dropoff Centroid Location","Dropoff_Centroid_Location")

In [37]:
%%time
all_records_count = final_DF.count()

CPU times: user 4.13 ms, sys: 4.68 ms, total: 8.81 ms
Wall time: 1min 10s


In [38]:
final_DF.printSchema()

root
 |-- Trip_ID: string (nullable = true)
 |-- Trip_Seconds: integer (nullable = true)
 |-- Trip_Miles: double (nullable = true)
 |-- Pickup_Census_Tract: string (nullable = true)
 |-- Dropoff_Census_Tract: string (nullable = true)
 |-- Pickup_Community_Area: integer (nullable = true)
 |-- Dropoff_Community_Area: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Tip: double (nullable = true)
 |-- Additional_Charges_str: double (nullable = true)
 |-- Trip_Total: double (nullable = true)
 |-- Shared_Trip_Authorized: boolean (nullable = true)
 |-- Trips_Pooled: integer (nullable = true)
 |-- Pickup_Centroid_Latitude: string (nullable = true)
 |-- Pickup_Centroid_Longitude: string (nullable = true)
 |-- Pickup_Centroid_Location: string (nullable = true)
 |-- Dropoff_Centroid_Latitude: string (nullable = true)
 |-- Dropoff_Centroid_Longitude: string (nullable = true)
 |-- Dropoff_Centroid_Location: string (nullable = true)
 |-- Trip_Start_Timestamp: timestamp (nullable = 

In [39]:
tip_record_count = final_DF.filter(F.col('label') == 1).count()
final_DF = final_DF.filter(F.col('label') == 1) \
                .union(final_DF.filter(F.col('label') == 0)
                       .sample(False, (tip_record_count/(all_records_count-tip_record_count)), 1221))

In [40]:
final_DF.groupby('label').count().show()

+-----+-------+
|label|  count|
+-----+-------+
|    1|2375183|
|    0|2375686|
+-----+-------+



In [41]:
%%time
final_DF.write.parquet("/../../project/ds5559/Alice_Ed_Michael_Sam_project/final_dataset.parquet")

CPU times: user 13.1 ms, sys: 10.6 ms, total: 23.7 ms
Wall time: 3min 28s


In [42]:
%%time
test = spark.read.parquet("/../../project/ds5559/Alice_Ed_Michael_Sam_project/final_dataset.parquet")

CPU times: user 1.5 ms, sys: 364 µs, total: 1.87 ms
Wall time: 199 ms


In [43]:
test.count()

4750869

## Dataset Subselection

We create a few helpful aggregated and sampled datasets to feed into our EDA notebook. First a 1% sample for scatterplots and boxplots. This will be approximately 47,500 records.

In [44]:
dataset_sample = final_DF.sample(False,.01)

In [45]:
dataset_sample.count()

46950

In [57]:
daily_summary = final_DF.groupBy('Trip_Start_Timestamp').agg(F.sum("label"),F.sum("Fare"), F.count("trip_id"))
daily_summary.show(5)

+--------------------+----------+---------+--------------+
|Trip_Start_Timestamp|sum(label)|sum(Fare)|count(trip_id)|
+--------------------+----------+---------+--------------+
| 2020-02-07 23:00:00|       832|  17307.5|          1697|
| 2020-02-13 10:00:00|       762|  16567.5|          1363|
| 2020-02-13 19:15:00|      1297|  27317.5|          2334|
| 2020-02-16 10:15:00|       675|  16420.0|          1261|
| 2020-02-17 07:15:00|       425|  13635.0|           888|
+--------------------+----------+---------+--------------+
only showing top 5 rows

