In [27]:
import findspark
findspark.init()
findspark.find()

'/usr/local/opt/apache-spark/libexec'

In [28]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = (
            SparkSession
                .builder
    
                .appName("TaxiOperationsDataFrameApp")
                .master("local[4]")
    
                .config("spark.dynamicAllocation.enabled", "false")
                .config("spark.sql.adaptive.enabled", "false")    
                .getOrCreate()
        )

sc = spark.sparkContext

spark

In [29]:
from IPython.display import *
display(HTML("<style>pre { white-space: pre !important; }</style>"))

### Create DataFrame - Option 1.a: From RDD

In [30]:
# Create RDD

data = [
            [ 1, "Neha",  10000 ],
            [ 2, "Steve", 20000 ],
            [ 3, "Kari",  30000 ],
            [ 4, "Ivan",  40000 ],
            [ 5, "Mohit", 50000 ]
       ]

employeesRdd = sc.parallelize(data)

In [31]:
# Create DataFrame and show content

employeesDF = employeesRdd.toDF()

employeesDF.show()

+---+-----+-----+
| _1|   _2|   _3|
+---+-----+-----+
|  1| Neha|10000|
|  2|Steve|20000|
|  3| Kari|30000|
|  4| Ivan|40000|
|  5|Mohit|50000|
+---+-----+-----+



In [32]:
# Define column names for DataFrame

employeesDF = employeesDF.toDF("Id", "Name", "Salary")

employeesDF.show()

+---+-----+------+
| Id| Name|Salary|
+---+-----+------+
|  1| Neha| 10000|
|  2|Steve| 20000|
|  3| Kari| 30000|
|  4| Ivan| 40000|
|  5|Mohit| 50000|
+---+-----+------+



In [33]:
# Print DataFrame schema

employeesDF.printSchema()

root
 |-- Id: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Salary: long (nullable = true)



### Create DataFrame - Option 1.b: From data collection

In [34]:
# Create DataFrame from collection

employeesDF = (
                    spark
                        .createDataFrame
                        (
                            data,                           # Pass RDD or collection
                            "Id: long, Name: string, Salary: long"  
                            
                                    # Pass schema as array ["Id", "Name", "Salary"]
                        )
               )

employeesDF.show()

+---+-----+------+
| Id| Name|Salary|
+---+-----+------+
|  1| Neha| 10000|
|  2|Steve| 20000|
|  3| Kari| 30000|
|  4| Ivan| 40000|
|  5|Mohit| 50000|
+---+-----+------+



### Create DataFrame - Option 2: Read a File

In [35]:
# Read YellowTaxis csv file to create DataFrame

yellowTaxiDF = (
                  spark    
                    .read    
                    .csv("./../files/input/YellowTaxis_202210.csv")
               )

In [36]:
# Display DataFrame content

yellowTaxiDF.show()

+--------+--------------------+--------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+--------------------+------------+--------------------+-----------+
|     _c0|                 _c1|                 _c2|            _c3|          _c4|       _c5|               _c6|         _c7|         _c8|         _c9|       _c10| _c11|   _c12|      _c13|        _c14|                _c15|        _c16|                _c17|       _c18|
+--------+--------------------+--------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+--------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_date...|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls

In [37]:
# Take column names from file header row

yellowTaxiDF = (
                  spark
                    .read
    
                    .option("header", "true")
    
                    .csv("./../files/input/YellowTaxis_202210.csv")
               )

yellowTaxiDF.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)
 |-- airport_fee: string (nullable = true)



In [12]:
# Use tab delimiter to read GreenTaxis file

greenTaxiDF = (
                  spark
                    .read                     
                    .option("header", "true")
    
                    .option("delimiter", "\t")
    
                    .csv("./../files/input/GreenTaxis_*.csv")
              )

greenTaxiDF.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorId|lpep_pickup_datetime|lpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2|2022-10-01T06:08:...| 2022-10-01T06:21:...|            1.0|         2.47|       1.0|                 N|         256|         225|         1.0|       11.5|  0.5|    0.5|      2.5

### Read JSON File

In [38]:
# Read PaymentTypes JSON file

paymentTypesDF = (
                      spark
                        .read
                        .json("./../files/input/PaymentTypes.json")
                 )

#paymentTypesDF = (
#                     spark
#                       .read

#                       .format("json")
#                       .load("C:\SparkCourse\DataFiles\Raw\PaymentTypes.json")
#                 )

paymentTypesDF.show()

+-----------+-------------+
|PaymentType|PaymentTypeID|
+-----------+-------------+
|Credit Card|            1|
|       Cash|            2|
|  No Charge|            3|
|    Dispute|            4|
|    Unknown|            5|
|Voided Trip|            6|
+-----------+-------------+



### Schema Option 1 - No schema inference or definition

<i>Check for jobs

In [14]:
# Read YellowTaxis csv file to create DataFrame

yellowTaxiDF = (
                  spark
                    .read    
                    .option("header", "true")    
                    .csv("./../files/input/YellowTaxis_202210.csv")
               )

yellowTaxiDF.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)
 |-- airport_fee: string (nullable = true)



### Schema Option 2 - Infer schema

<i>Check for jobs

In [15]:
# Read YellowTaxis csv file, and create DataFrame by inferring the schema

yellowTaxiDF = (
                  spark
                    .read
                    .option("header", "true")
    
                    .option("inferSchema", "true")
    
                    .csv("./../files/input/YellowTaxis_202210.csv")
               )

yellowTaxiDF.printSchema()

[Stage 16:>                                                         (0 + 4) / 4]

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



                                                                                

### Schema Option 3 - Define schema & apply

<i>Check for jobs

In [16]:
# Create schema for Yellow Taxi Data

yellowTaxiSchema = (
                        StructType
                        ([ 
                            StructField("VendorId"               , IntegerType()   , True),
                            StructField("lpep_pickup_datetime"   , TimestampType() , True),
                            StructField("lpep_dropoff_datetime"  , TimestampType() , True),
                            StructField("passenger_count"        , DoubleType()    , True),
                            StructField("trip_distance"          , DoubleType()    , True),
                            StructField("RatecodeID"             , DoubleType()    , True),
                            StructField("store_and_fwd_flag"     , StringType()    , True),
                            StructField("PULocationID"           , IntegerType()   , True),
                            StructField("DOLocationID"           , IntegerType()   , True),
                            StructField("payment_type"           , IntegerType()   , True),
                            StructField("fare_amount"            , DoubleType()    , True),
                            StructField("extra"                  , DoubleType()    , True),
                            StructField("mta_tax"                , DoubleType()    , True),
                            StructField("tip_amount"             , DoubleType()    , True),
                            StructField("tolls_amount"           , DoubleType()    , True),
                            StructField("improvement_surcharge"  , DoubleType()    , True),
                            StructField("total_amount"           , DoubleType()    , True),
                            StructField("congestion_surcharge"   , DoubleType()    , True),
                            StructField("airport_fee"            , DoubleType()    , True)
                        ])
                   )

In [17]:
# Read YellowTaxis csv file, and create DataFrame by applying the schema

yellowTaxiDF = (
                  spark
                    .read
                    .option("header", "true")
    
                    .schema(yellowTaxiSchema)
    
                    .csv("./../files/input/YellowTaxis_202210.csv")
               )

yellowTaxiDF.printSchema()

root
 |-- VendorId: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



### Transform Data

#### 1. Select limited columns

In [18]:
yellowTaxiDF = (
                   yellowTaxiDF

                        # Select only limited columns
                        .select(
                                  "VendorID",
                             
                                  col("passenger_count").cast(IntegerType()),
                            
                                  column("trip_distance").alias("TripDistance"),
                            
                                  yellowTaxiDF.lpep_pickup_datetime,
                            
                                  "lpep_dropoff_datetime",
                                  "PUlocationID",
                                  "DOlocationID",
                                  "RatecodeID",
                                  "total_amount",
                                  "payment_type"
                               )
    
                        # Don't run, since airport_fee has not been selected above    
                        # .drop("airport_fee") 
               )

yellowTaxiDF.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- TripDistance: double (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)



#### 2. Rename columns

In [19]:
yellowTaxiDF = (
                   yellowTaxiDF                        
                        
                        .withColumnRenamed("passenger_count", "PassengerCount")
    
                        .withColumnRenamed("lpep_pickup_datetime", "PickupTime")
                        .withColumnRenamed("lpep_dropoff_datetime", "DropTime")
                        .withColumnRenamed("PUlocationID", "PickupLocationId")
                        .withColumnRenamed("DOlocationID", "DropLocationId")
                        .withColumnRenamed("total_amount", "TotalAmount")
                        .withColumnRenamed("payment_type", "PaymentType")    
               )

yellowTaxiDF.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- PassengerCount: integer (nullable = true)
 |-- TripDistance: double (nullable = true)
 |-- PickupTime: timestamp (nullable = true)
 |-- DropTime: timestamp (nullable = true)
 |-- PickupLocationId: integer (nullable = true)
 |-- DropLocationId: integer (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- TotalAmount: double (nullable = true)
 |-- PaymentType: integer (nullable = true)



#### 3.a. Create derived columns - TripYear, TripMonth, TripDay

In [20]:
# Create derived columns for year, month and day
yellowTaxiDF = (
                  yellowTaxiDF
    
                        .withColumn("TripYear", year(col("PickupTime")))
    
                        .select(
                                    "*",
                            
                                    expr("month(PickupTime) AS TripMonth"),
                            
                                    dayofmonth(col("PickupTime")).alias("TripDay")
                               )
               )

yellowTaxiDF.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- PassengerCount: integer (nullable = true)
 |-- TripDistance: double (nullable = true)
 |-- PickupTime: timestamp (nullable = true)
 |-- DropTime: timestamp (nullable = true)
 |-- PickupLocationId: integer (nullable = true)
 |-- DropLocationId: integer (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- TotalAmount: double (nullable = true)
 |-- PaymentType: integer (nullable = true)
 |-- TripYear: integer (nullable = true)
 |-- TripMonth: integer (nullable = true)
 |-- TripDay: integer (nullable = true)



#### 3.b. Create derived column - TripTimeInMinutes

In [21]:
# Option 1
yellowTaxiDF = (
                  yellowTaxiDF
                        .withColumn("TripTimeInMinutes", 
                                        round(
                                                (unix_timestamp(col("DropTime")) 
                                                     - unix_timestamp(col("PickupTime"))) 
                                            
                                                  / 60
                                             )
                                   )
               )

yellowTaxiDF.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- PassengerCount: integer (nullable = true)
 |-- TripDistance: double (nullable = true)
 |-- PickupTime: timestamp (nullable = true)
 |-- DropTime: timestamp (nullable = true)
 |-- PickupLocationId: integer (nullable = true)
 |-- DropLocationId: integer (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- TotalAmount: double (nullable = true)
 |-- PaymentType: integer (nullable = true)
 |-- TripYear: integer (nullable = true)
 |-- TripMonth: integer (nullable = true)
 |-- TripDay: integer (nullable = true)
 |-- TripTimeInMinutes: double (nullable = true)



In [22]:
# Option 2

tripTimeInSecondsExpr = unix_timestamp(col("DropTime")) - unix_timestamp(col("PickupTime"))


tripTimeInMinutesExpr = round(tripTimeInSecondsExpr / 60)


yellowTaxiDF = (
                  yellowTaxiDF
                        .withColumn("TripTimeInMinutes", tripTimeInMinutesExpr)
               )

yellowTaxiDF.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- PassengerCount: integer (nullable = true)
 |-- TripDistance: double (nullable = true)
 |-- PickupTime: timestamp (nullable = true)
 |-- DropTime: timestamp (nullable = true)
 |-- PickupLocationId: integer (nullable = true)
 |-- DropLocationId: integer (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- TotalAmount: double (nullable = true)
 |-- PaymentType: integer (nullable = true)
 |-- TripYear: integer (nullable = true)
 |-- TripMonth: integer (nullable = true)
 |-- TripDay: integer (nullable = true)
 |-- TripTimeInMinutes: double (nullable = true)



#### 3.c. Create derived column - TripType

In [23]:
tripTypeColumn = (
                    when(
                            col("RatecodeID") == 6,
                              "SharedTrip"
                         )
                    .otherwise("SoloTrip")
                 )    


yellowTaxiDF = (
                  yellowTaxiDF
    
                        .withColumn("TripType", tripTypeColumn)
    
                        .drop("RatecodeID")
               )

yellowTaxiDF.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- PassengerCount: integer (nullable = true)
 |-- TripDistance: double (nullable = true)
 |-- PickupTime: timestamp (nullable = true)
 |-- DropTime: timestamp (nullable = true)
 |-- PickupLocationId: integer (nullable = true)
 |-- DropLocationId: integer (nullable = true)
 |-- TotalAmount: double (nullable = true)
 |-- PaymentType: integer (nullable = true)
 |-- TripYear: integer (nullable = true)
 |-- TripMonth: integer (nullable = true)
 |-- TripDay: integer (nullable = true)
 |-- TripTimeInMinutes: double (nullable = true)
 |-- TripType: string (nullable = false)



### Save data in CSV format to storage

In [24]:
yellowTaxiDF = (
    spark
    .read
    .option("header", "true")

    .option("inferSchema", "true")

    .csv("./../files/input/YellowTaxis_202210.csv")
)

                                                                                

In [26]:
(
    yellowTaxiDF    
            .write
            
            .option("header", "true")
            .option("dateFormat", "yyyy-MM-dd HH:mm:ss.S")
    
            .mode("overwrite")    # Options - Append, ErrorIfExists, Ignore, Overwrite
    
            .csv("./../files/output/YellowTaxisOutput.csv")
)

                                                                                