In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession, functions as F

In [3]:
spark = (
    SparkSession.builder.appName("MAST30034 Tutorial 1")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .getOrCreate()
)

22/08/22 15:29:58 WARN Utils: Your hostname, DESKTOP-7TBG09Q resolves to a loopback address: 127.0.1.1; using 172.26.168.108 instead (on interface eth0)
22/08/22 15:29:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/22 15:29:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [15]:
sdf = spark.read.parquet('../../ADS_Project_1/raw_data/fhv')

In [17]:
sdf.limit(30)

dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
B00009,2022-01-01 11:31:00,2022-01-01 12:05:00,,,,B00009
B00009,2022-01-01 11:37:00,2022-01-01 12:05:00,,,,B00009
B00037,2022-01-01 11:56:37,2022-01-01 12:06:11,,85.0,,B00037
B00037,2022-01-01 11:19:54,2022-01-01 11:30:47,,85.0,,B00037
B00037,2022-01-01 11:41:49,2022-01-01 11:52:16,,188.0,,B00037
B00037,2022-01-01 11:21:32,2022-01-01 11:35:06,,61.0,,B00037
B00037,2022-01-01 11:51:19,2022-01-01 12:08:06,,76.0,,B00037
B00111,2022-01-01 11:30:00,2022-01-01 12:41:00,,,,B03406
B00112,2022-01-01 11:31:30,2022-01-01 12:10:06,,67.0,,B00112
B00112,2022-01-01 11:12:26,2022-01-01 11:37:22,,155.0,,B00112


22/08/22 16:00:06 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 415116 ms exceeds timeout 120000 ms
22/08/22 16:00:06 WARN SparkContext: Killing executors is not supported by current scheduler.


In [13]:
sdf.count()

12671164

In [8]:
sdf.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



# Preprocessing Steps -


## 1. Typecasting
Making sure that all columns are appropriately typed

## 2. Getting rid of unrelated data (as per Data Dictionary)
We won't care about some fields for the purposes of our analysis, so need to drop them

## 3. Handling of Missing Values
If NaN / null values are present, need to deal with them appropriately - imputation, deletion, etc

## 4. Outlier Removal / Analysis
Outliers have to be dealt with appropriately, as they can skew data


### Typecasting

There are several columns that we need to adjust the type for. They'll be listed below with their justifications
- VendorID: Inspection of the data revealed that this only takes integer values, so it should be changed to an int
- passenger_count: Should be changed to int since # of people is an integer
- RatecodeID: Similar to VendorID

In [13]:
# I want to check if any taxi drivers have reported half a person -
columns = ['passenger_count']
for column in columns:
    sdf.select(column).distinct().show()
    
# Can confirm that only integers are reported

+---------------+
|passenger_count|
+---------------+
|            8.0|
|            0.0|
|            7.0|
|           null|
|            1.0|
|            4.0|
|            3.0|
|            2.0|
|            6.0|
|            5.0|
|            9.0|
+---------------+



# How come this code doesn't work?

sdf.select(column).distinct().show()

In [16]:
columns = ['VendorID', 'passenger_count', 'RatecodeID']
for column in columns:
    sdf = sdf.withColumn(
        column,
        F.col(column).astype('INT')
    )

In [17]:
sdf.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



### Converting PU/DOLocation to Geometry 
Code attributed to tute2

In [21]:
sdf.select(F.min('fare_amount'))

                                                                                

min(fare_amount)
-2564.0


In [None]:
aggregated_results = sdf \
                    .groupBy("passenger_count") \
                    .agg(
                        F.mean("total_amount").alias("avg_trip_amount_usd"),
                        F.max("trip_distance").alias("max_trip_distance_miles")
                    ) \
                    .orderBy("passenger_count")

aggregated_results.show()