# Exploratory Data Analysis and Data Prep in Spark

In this workbook, you will use one year of data of the NYC Taxi Dataset that is publicly available on Azure. The year is 2015 and the 12 files represent about 21GB of data

**You may use the DataFrame API, the SparkSQL API, or both as you wish as long as it produces the expected results.**

It is recommended to make small versions of the dataset for testing purposes so that you can trial and error faster. Run a command similar to `df_small = df.limit(10000)`

Make sure you use your [PySparkSQL Cheat Sheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) to help you with the commands to complete the assignment.

Instructions:

1. Read in the data 


2. Execute initial data exploration
    - Determine the shape of the data, column names, data types
    - Check for the number of missing values within critical fields of the dataset: `tpep_pickup_datetime`, `tpep_dropoff_datetime`, `passenger_count`, `trip_distance`, and `fare_amount`. There is a `count` command to use with columns in a dataframe and several ways to check for missing values. Try Googling some options!
    - Check the counts for values of passenger_count and remove any outlier values (how many people fit into a taxi?). Only include positive integers up to and including 6. Justify your analytical decision. You can use the `.isin()` method to create boolean values from a column ([see here](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.Column.isin.html)).


3. Convert fields types 
    
    Conver the following fields to the specified data types. You must do this becasue the fields were read in as string (unless you specified the schema at read time.)    
    * `tpep_pickup_datetime` and `tpep_dropoff_datetime` must be TIMESTAMP
    * `passenger_count` and `rate_code` must be INT
    * all other numeric fields must be FLOAT
    * the remaining fields stay as STRING


4. Create new variables for:
    * indicating if `tpep_pickup_datetime` happens on a weekend or not
    * indicating if `tpep_pickup_datetime` is within the weekday rush hour (6:00-9:59am, 2:00-5:59pm) or not
    * indicating if the `tip_amount` is greater than 10% of the fare_amount or not


5. Report the average, median, Q25, and Q75 fare amount in a table conditioned on the variables you created. Each table should look like the example below, though your numbers might not match exactly.


 |  weekdayrush_dummy  |mean_fare  |Q25|  median|   Q75|  num_rides|
|----------------------|-----------|---|--------|------|-----------|
|True     |12.462  |6.5     |9.0  |14.0   |44848941|
|False     |12.312  |6.5     |9.5  |14.0  |128336134|
       
You are welcome to add as many cells as you need. Clearly indicate in your notebook each step so graders can confirm you have accomplished each task. **You must include comments in your code.**

In [1]:
spark

StatementMeta(70d4139e-3496-47df-9fd8-bcdeb57818f4, 14, 6, Finished, Available)

In [2]:
sc

StatementMeta(70d4139e-3496-47df-9fd8-bcdeb57818f4, 14, 7, Finished, Available)

### Step 1: Read in the data

In [3]:
taxi_filepath = "wasbs://datasets@azuremlexamples.blob.core.windows.net/nyctaxi/*.csv"

StatementMeta(70d4139e-3496-47df-9fd8-bcdeb57818f4, 14, 8, Finished, Available)

Load and cache the 12 CSV files (with header) and cache the Spark DataFrame.

In [4]:
taxi_df = spark.read.option("header", "true").csv(taxi_filepath)

StatementMeta(70d4139e-3496-47df-9fd8-bcdeb57818f4, 14, 9, Finished, Available)

In [5]:
taxi_df.cache()

StatementMeta(70d4139e-3496-47df-9fd8-bcdeb57818f4, 14, 10, Finished, Available)

DataFrame[VendorID: string, tpep_pickup_datetime: string, tpep_dropoff_datetime: string, passenger_count: string, trip_distance: string, pickup_longitude: string, pickup_latitude: string, RatecodeID: string, store_and_fwd_flag: string, dropoff_longitude: string, dropoff_latitude: string, payment_type: string, fare_amount: string, extra: string, mta_tax: string, tip_amount: string, tolls_amount: string, improvement_surcharge: string, total_amount: string]

### Step 2: Exploratory data analysis

In [6]:
print("dats count(Number of rows):", taxi_df.count())

StatementMeta(70d4139e-3496-47df-9fd8-bcdeb57818f4, 14, 11, Finished, Available)

dats count(Number of rows): 146112989


In [7]:
print("Number of columns:", len(taxi_df.columns))

StatementMeta(70d4139e-3496-47df-9fd8-bcdeb57818f4, 14, 12, Finished, Available)

Number of columns: 19


In [8]:
print("Column names:", taxi_df.columns)

StatementMeta(70d4139e-3496-47df-9fd8-bcdeb57818f4, 14, 13, Finished, Available)

Column names: ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'pickup_longitude', 'pickup_latitude', 'RatecodeID', 'store_and_fwd_flag', 'dropoff_longitude', 'dropoff_latitude', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount']


In [9]:
taxi_df.printSchema()

StatementMeta(70d4139e-3496-47df-9fd8-bcdeb57818f4, 14, 14, Finished, Available)

root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)



In [10]:
from pyspark.sql.functions import col, sum, when

checked_cols = ['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'fare_amount']

#check each checked_cols
#I asked LLM to show this method
conditions = [sum(when(col(field).isNull() | (col(field) == ""), 1).otherwise(0)).alias(field) for field in checked_cols]

#aggregate the df using
missing_counts = taxi_df.agg(*conditions).collect()

#check reslut
missing_counts_row = missing_counts[0]
missing_counts_row
# There no missing values in these four critical fields

StatementMeta(70d4139e-3496-47df-9fd8-bcdeb57818f4, 14, 15, Finished, Available)

Row(tpep_pickup_datetime=0, tpep_dropoff_datetime=0, passenger_count=0, trip_distance=0, fare_amount=0)

In [11]:
valid_passenger_range = [1, 2, 3, 4, 5, 6]

#filter the dataframe: passenger_count <=6
taxi_df = taxi_df.filter(col("passenger_count").isin(valid_passenger_range))

StatementMeta(70d4139e-3496-47df-9fd8-bcdeb57818f4, 14, 16, Finished, Available)

In [12]:
taxi_df.take(2)

StatementMeta(70d4139e-3496-47df-9fd8-bcdeb57818f4, 14, 17, Finished, Available)

[Row(VendorID='2', tpep_pickup_datetime='2015-01-15 19:05:39', tpep_dropoff_datetime='2015-01-15 19:23:42', passenger_count='1', trip_distance='1.59', pickup_longitude='-73.993896484375', pickup_latitude='40.750110626220703', RatecodeID='1', store_and_fwd_flag='N', dropoff_longitude='-73.974784851074219', dropoff_latitude='40.750617980957031', payment_type='1', fare_amount='12', extra='1', mta_tax='0.5', tip_amount='3.25', tolls_amount='0', improvement_surcharge='0.3', total_amount='17.05'),
 Row(VendorID='1', tpep_pickup_datetime='2015-01-10 20:33:38', tpep_dropoff_datetime='2015-01-10 20:53:28', passenger_count='1', trip_distance='3.30', pickup_longitude='-74.00164794921875', pickup_latitude='40.7242431640625', RatecodeID='1', store_and_fwd_flag='N', dropoff_longitude='-73.994415283203125', dropoff_latitude='40.759109497070313', payment_type='1', fare_amount='14.5', extra='0.5', mta_tax='0.5', tip_amount='2', tolls_amount='0', improvement_surcharge='0.3', total_amount='17.8')]

### Step 3: Change data types
This step can also be integrated into Step 2 merging if using PySparkSQL

In [13]:
#convert date related cols to 'timestamp'
taxi_df = taxi_df.withColumn("tpep_pickup_datetime", col("tpep_pickup_datetime").cast("timestamp"))
taxi_df = taxi_df.withColumn("tpep_dropoff_datetime", col("tpep_dropoff_datetime").cast("timestamp"))

StatementMeta(70d4139e-3496-47df-9fd8-bcdeb57818f4, 14, 18, Finished, Available)

In [14]:
#convert passenger_count and RatecodeID to 'int'
taxi_df = taxi_df.withColumn("passenger_count", col("passenger_count").cast("int"))
taxi_df = taxi_df.withColumn("RatecodeID", col("RatecodeID").cast("int"))

StatementMeta(70d4139e-3496-47df-9fd8-bcdeb57818f4, 14, 19, Finished, Available)

In [15]:
#convert other numeric values to 'float'
float_fields = ["trip_distance", "pickup_longitude", "pickup_latitude", "dropoff_longitude", 
                "dropoff_latitude", "fare_amount", "extra", "mta_tax", "tip_amount", 
                "tolls_amount", "improvement_surcharge", "total_amount"]

for field in float_fields:
    taxi_df = taxi_df.withColumn(field, col(field).cast("float"))

StatementMeta(70d4139e-3496-47df-9fd8-bcdeb57818f4, 14, 20, Finished, Available)

In [16]:
taxi_df.printSchema()

StatementMeta(70d4139e-3496-47df-9fd8-bcdeb57818f4, 14, 21, Finished, Available)

root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- pickup_longitude: float (nullable = true)
 |-- pickup_latitude: float (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: float (nullable = true)
 |-- dropoff_latitude: float (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: float (nullable = true)
 |-- extra: float (nullable = true)
 |-- mta_tax: float (nullable = true)
 |-- tip_amount: float (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- improvement_surcharge: float (nullable = true)
 |-- total_amount: float (nullable = true)



In [17]:
taxi_df.show(5)

StatementMeta(70d4139e-3496-47df-9fd8-bcdeb57818f4, 14, 22, Finished, Available)

+--------+--------------------+---------------------+---------------+-------------+----------------+---------------+----------+------------------+-----------------+----------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|pickup_longitude|pickup_latitude|RatecodeID|store_and_fwd_flag|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+----------------+---------------+----------+------------------+-----------------+----------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       2| 2015-01-15 19:05:39|  2015-01-15 19:23:42|              1|         1.59|        -73.9939|       40.75011|         1|                 N|       -73.974

### Step 4: Create new variables

Reminder that using a small dataframe for testing will help you develop your analytics faster.

In [18]:
from pyspark.sql.functions import col, dayofweek, hour, minute, expr
#set conditiosn to is_weekend, 1 is Sunday, and 7 is Saturday
taxi_df = taxi_df.withColumn("is_weekend", 
                             (dayofweek(col("tpep_pickup_datetime")) == 1) | 
                             (dayofweek(col("tpep_pickup_datetime")) == 7))

StatementMeta(70d4139e-3496-47df-9fd8-bcdeb57818f4, 14, 23, Finished, Available)

In [19]:
#set conditions to rush hour
taxi_df = taxi_df.withColumn("is_rush_hour", 
                             (dayofweek(col("tpep_pickup_datetime")).between(2, 6)) & 
                             (((hour(col("tpep_pickup_datetime")) >= 6) & 
                               (hour(col("tpep_pickup_datetime")) < 10)) | 
                              ((hour(col("tpep_pickup_datetime")) >= 14) & 
                               (hour(col("tpep_pickup_datetime")) < 18))))

StatementMeta(70d4139e-3496-47df-9fd8-bcdeb57818f4, 14, 24, Finished, Available)

In [20]:
#set conditions for tip amount
taxi_df = taxi_df.withColumn("is_tip_gt_10pct", 
                             col("tip_amount") > col("fare_amount") * 0.1)

StatementMeta(70d4139e-3496-47df-9fd8-bcdeb57818f4, 14, 25, Finished, Available)

In [21]:
taxi_df.show(5)

StatementMeta(70d4139e-3496-47df-9fd8-bcdeb57818f4, 14, 26, Finished, Available)

+--------+--------------------+---------------------+---------------+-------------+----------------+---------------+----------+------------------+-----------------+----------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+----------+------------+---------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|pickup_longitude|pickup_latitude|RatecodeID|store_and_fwd_flag|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|is_weekend|is_rush_hour|is_tip_gt_10pct|
+--------+--------------------+---------------------+---------------+-------------+----------------+---------------+----------+------------------+-----------------+----------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+----------+------------+---------------+
|       2| 2015-01-15 19:05:39|  2015-01

### Step 5: Analytics

In [22]:
from pyspark.sql import functions as f
#compute necessary info for weekend condition
#I avoided using 'lit' to control approximation accuracy, as it could increase memory usage, potentially leading to longer running times.
df_weekend = taxi_df.groupBy("is_weekend").agg(
    f.mean('fare_amount').alias('mean_fare'),
    f.expr('percentile_approx(fare_amount, 0.25)').alias('Q25'),
    f.expr('percentile_approx(fare_amount, 0.5)').alias('median'),
    f.expr('percentile_approx(fare_amount, 0.75)').alias('Q75'),
    f.count('*').alias('num_rides')
).withColumnRenamed("is_weekend", 'weekdayrush_dummy')

StatementMeta(70d4139e-3496-47df-9fd8-bcdeb57818f4, 14, 27, Finished, Available)

In [23]:
#compute necessary info for the rush hour condition
df_rushhour = taxi_df.groupBy("is_rush_hour").agg(
    f.mean('fare_amount').alias('mean_fare'),
    f.expr('percentile_approx(fare_amount, 0.25)').alias('Q25'),
    f.expr('percentile_approx(fare_amount, 0.5)').alias('median'),
    f.expr('percentile_approx(fare_amount, 0.75)').alias('Q75'),
    f.count('*').alias('num_rides')
).withColumnRenamed("is_rush_hour", 'rushhour_dummy')

StatementMeta(70d4139e-3496-47df-9fd8-bcdeb57818f4, 14, 28, Finished, Available)

In [24]:
#compute necessary info for the tip condition
df_tip = taxi_df.groupBy("is_tip_gt_10pct").agg(
    f.mean('fare_amount').alias('mean_fare'),
    f.expr('percentile_approx(fare_amount, 0.25)').alias('Q25'),
    f.expr('percentile_approx(fare_amount, 0.5)').alias('median'),
    f.expr('percentile_approx(fare_amount, 0.75)').alias('Q75'),
    f.count('*').alias('num_rides')
).withColumnRenamed("is_tip_gt_10pct", 'tip_dummy')

StatementMeta(70d4139e-3496-47df-9fd8-bcdeb57818f4, 14, 29, Finished, Available)

In [None]:
#change to pd and check result
pd_weekend = df_weekend.toPandas()
pd_rushhour = df_rushhour.toPandas()
pd_tip = df_tip.toPandas()

In [28]:
print("Weekend Data:\n", pd_weekend, "\n")
print("Rush Hour Data:\n", pd_rushhour, "\n")
print("Tip Data:\n", pd_tip, "\n")

StatementMeta(70d4139e-3496-47df-9fd8-bcdeb57818f4, 14, 33, Finished, Available)

Weekend Data:
    weekdayrush_dummy  mean_fare  Q25  median   Q75  num_rides
0               True  12.683451  6.5     9.5  14.5   42391513
1              False  13.042870  6.5     9.5  15.0  103680034 

Rush Hour Data:
    rushhour_dummy  mean_fare  Q25  median   Q75  num_rides
0            True  13.120276  6.5     9.5  14.5   37963857
1           False  12.874751  6.5     9.5  15.0  108107690 

Tip Data:
    tip_dummy  mean_fare  Q25  median   Q75  num_rides
0       True  13.085738  7.0     9.5  15.0   82876509
1      False  12.745550  6.5     9.5  14.5   63195038 



In [26]:
#we need to make our result looks like the example in readme file on github
necessary_columns = ['mean_fare', 'Q25', 'median', 'Q75', 'num_rides']
df_weekend = pd_weekend[necessary_columns].rename(columns={'median': 'Q50'})
df_rushhour = pd_rushhour[necessary_columns].rename(columns={'median': 'Q50'})
df_tip = pd_tip[necessary_columns].rename(columns={'median': 'Q50'})

StatementMeta(70d4139e-3496-47df-9fd8-bcdeb57818f4, 14, 31, Finished, Available)

## **Save your analytics results to a json object - then add, commit, and push your notebook and json to GitHub!**

Remember that the default path on the Serverless Spark compute **is not the same as the compute instance.** When you start a Jupyter notebook from within the Serverless Spark, the working directory of the Python notebook is `/synfs/notebook/1/aml_notebook_mount`. **This is not within the repo directory.** To save a file from the Python notebook within your repository, you will have to add a prefix to the path in the form of: `Users/<YOUR-NET-ID>/<REPO-NAME>/filename.fileextension`. 

In [27]:
import json
json.dump({'weekend' : df_weekend.to_dict(orient='records'),
           'tip' : df_tip.to_dict(orient='records'),
           'rushhour' : df_rushhour.to_dict('records')
          }, 
          fp = open('taxi-soln.json','w'))

StatementMeta(70d4139e-3496-47df-9fd8-bcdeb57818f4, 14, 32, Finished, Available)