# Imports

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Spark session

In [5]:
spark = (SparkSession.builder.appName("DataFrameP").master("local[4]")
         .config("spark.dynamicAllocation.enabled", "false")
         .config("spark.sql adaptive.enabled", "false")
         .getOrCreate())
sc = spark.sparkContext
spark

24/09/18 18:29:25 WARN Utils: Your hostname, Tulasis-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.9 instead (on interface en0)
24/09/18 18:29:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/18 18:29:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Create Data frame using RDD

In [7]:
data = [
    [1, 'Alice', 25],
    [2, 'Bob', 30],
    [3, 'Charlie', 22],
    [4, 'David', 28],
    [5, 'Eve', 27],
    [6, 'Frank', 33],
    [7, 'Grace', 26],
    [8, 'Hannah', 31],
    [9, 'Ivy', 29],
    [10, 'Jack', 24]
]

In [8]:
dataRDD = sc.parallelize(data)

In [9]:
# create data frame from RDD
dataDF = dataRDD.toDF()
dataDF.show()

                                                                                

+---+-------+---+
| _1|     _2| _3|
+---+-------+---+
|  1|  Alice| 25|
|  2|    Bob| 30|
|  3|Charlie| 22|
|  4|  David| 28|
|  5|    Eve| 27|
|  6|  Frank| 33|
|  7|  Grace| 26|
|  8| Hannah| 31|
|  9|    Ivy| 29|
| 10|   Jack| 24|
+---+-------+---+



In [10]:
# Define cloumn names for Data Frames
dataDF = dataDF.toDF("S.NO","Name","Age")
dataDF.show()

+----+-------+---+
|S.NO|   Name|Age|
+----+-------+---+
|   1|  Alice| 25|
|   2|    Bob| 30|
|   3|Charlie| 22|
|   4|  David| 28|
|   5|    Eve| 27|
|   6|  Frank| 33|
|   7|  Grace| 26|
|   8| Hannah| 31|
|   9|    Ivy| 29|
|  10|   Jack| 24|
+----+-------+---+



In [11]:
# print data frame schema

dataDF.printSchema()

root
 |-- S.NO: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)



# Create Data Fame from Data Source

In [13]:
# dataDF = spark.createDataFrame(data,["S.No","Name","Age"])
dataDF = spark.createDataFrame(data,"SNO : long, Name: string, Age: long") # alternative
dataDF.show()

+---+-------+---+
|SNO|   Name|Age|
+---+-------+---+
|  1|  Alice| 25|
|  2|    Bob| 30|
|  3|Charlie| 22|
|  4|  David| 28|
|  5|    Eve| 27|
|  6|  Frank| 33|
|  7|  Grace| 26|
|  8| Hannah| 31|
|  9|    Ivy| 29|
| 10|   Jack| 24|
+---+-------+---+



In [14]:
dataDF.printSchema()

root
 |-- SNO: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)



# Create Data Frame using a file

In [16]:
# file path /Users/tulasiramreddygade/Downloads/apache-spark-3-fundamentals/DataFiles/Raw/YellowTaxis_202210.csv
path = "/Users/tulasiramreddygade/Downloads/apache-spark-3-fundamentals/DataFiles/Raw/YellowTaxis_202210.csv"
yellowTaxisDF = spark.read.csv(path)

yellowTaxisDF.take(3)

[Row(_c0='VendorID', _c1='tpep_pickup_datetime', _c2='tpep_dropoff_datetime', _c3='passenger_count', _c4='trip_distance', _c5='RatecodeID', _c6='store_and_fwd_flag', _c7='PULocationID', _c8='DOLocationID', _c9='payment_type', _c10='fare_amount', _c11='extra', _c12='mta_tax', _c13='tip_amount', _c14='tolls_amount', _c15='improvement_surcharge', _c16='total_amount', _c17='congestion_surcharge', _c18='airport_fee'),
 Row(_c0='1', _c1='2022-10-01T05:33:41.000+05:30', _c2='2022-10-01T05:48:39.000+05:30', _c3='1.0', _c4='1.7', _c5='1.0', _c6='N', _c7='249', _c8='107', _c9='1', _c10='9.5', _c11='3.0', _c12='0.5', _c13='2.65', _c14='0.0', _c15='0.3', _c16='15.95', _c17='2.5', _c18='0.0'),
 Row(_c0='2', _c1='2022-10-01T05:44:30.000+05:30', _c2='2022-10-01T05:49:48.000+05:30', _c3='2.0', _c4='0.72', _c5='1.0', _c6='N', _c7='151', _c8='238', _c9='2', _c10='5.5', _c11='0.5', _c12='0.5', _c13='0.0', _c14='0.0', _c15='0.3', _c16='9.3', _c17='2.5', _c18='0.0')]

In [17]:
# Take taxi column from file header row
yellowTaxisDF = spark.read.option("header","true").csv(path)

yellowTaxisDF.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)
 |-- airport_fee: string (nullable = true)



## Read TSV file

In [19]:
greenTaxiFilePath = "/Users/tulasiramreddygade/Downloads/apache-spark-3-fundamentals/DataFiles/Raw/GreenTaxis_*.cs"
greenTaxiDF = spark.read.option("header","true").option("delimiter","\t").csv(path)



## Read json file

In [21]:
pathToJsonFile = "/Users/tulasiramreddygade/Downloads/apache-spark-3-fundamentals/DataFiles/Raw/PaymentTypes.json"

paymentTypeJSON = spark.read.json(pathToJsonFile)

# spark.read.format("formatType").Load(path) # Read any type of data 
paymentTypeJSON.show()

+-----------+-------------+
|PaymentType|PaymentTypeID|
+-----------+-------------+
|Credit Card|            1|
|       Cash|            2|
|  No Charge|            3|
|    Dispute|            4|
|    Unknown|            5|
|Voided Trip|            6|
+-----------+-------------+



# Applying Schema

In [23]:
yellowTaxiDF = spark.read.option("header","true").csv(path)

yellowTaxiDF.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)
 |-- airport_fee: string (nullable = true)



## InferingSchema 

In [25]:
yellowTaxiDFSchemaInference = spark.read.option("header","true").option("inferSchema","true").csv(path)

yellowTaxiDFSchemaInference.printSchema()

[Stage 15:>                                                         (0 + 4) / 4]

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



                                                                                

## Define own schema

In [27]:
yellowTaxiSchema = (StructType([
    StructField("VendorId",IntegerType(),True),
    StructField("tpep_pickup_datetime",TimestampType(),True),
    StructField("tpep_dropoff_datetime",TimestampType(),True),
    StructField("passenger_count",DoubleType(),True),
    StructField("trip_distance",DoubleType(),True),
    StructField("RatecodeID",DoubleType(),True),
    StructField("store_and_fwd_flag",StringType(),True),
    StructField("PULocationID",IntegerType(),True),
    StructField("DOLocationID",IntegerType(),True),
    StructField("payment_type",IntegerType(),True),
    StructField("fare_amount",DoubleType(),True),
    StructField("extra",DoubleType(),True),
    StructField("mta_tax",DoubleType(),True),
    StructField("tip_amount",DoubleType(),True),
    StructField("tolls_amount",DoubleType(),True),
    StructField("improvement_surcharge",DoubleType(),True),
    StructField("total_amount",DoubleType(),True),
    StructField("congestion_surcharge",DoubleType(),True),
    StructField("airport_fee",DoubleType(),True)
]))

In [28]:
yellowTaxiDFWithSchema = spark.read.option("header","true").schema(yellowTaxiSchema).csv(path)

yellowTaxiDFWithSchema.printSchema()

root
 |-- VendorId: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



## Define schema for json file

In [30]:
TaxiBasesPath = '/Users/tulasiramreddygade/Downloads/apache-spark-3-fundamentals/DataFiles/Raw/TaxiBases.json'

In [31]:
TaxiBasesDF = spark.read.option("multiline",True).json(TaxiBasesPath)

TaxiBasesDF.take(1)

[Row(Address=Row(Building='636', City='NEW YORK', Postcode=10001, State='NY', Street='WEST   28 STREET'), Date='08/15/2019', Entity Name='VIER-NY,LLC', GeoLocation=Row(Latitude=40.75273, Location='(40.75273, -74.006408)', Longitude=-74.006408), License Number='B02865', SHL Endorsed='No', Telephone Number=6466657536, Time='18:03:31', Type of Base='BLACK CAR BASE')]

In [32]:
TaxiBasesDF.printSchema()

root
 |-- Address: struct (nullable = true)
 |    |-- Building: string (nullable = true)
 |    |-- City: string (nullable = true)
 |    |-- Postcode: long (nullable = true)
 |    |-- State: string (nullable = true)
 |    |-- Street: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Entity Name: string (nullable = true)
 |-- GeoLocation: struct (nullable = true)
 |    |-- Latitude: double (nullable = true)
 |    |-- Location: string (nullable = true)
 |    |-- Longitude: double (nullable = true)
 |-- License Number: string (nullable = true)
 |-- SHL Endorsed: string (nullable = true)
 |-- Telephone Number: long (nullable = true)
 |-- Time: string (nullable = true)
 |-- Type of Base: string (nullable = true)



In [33]:
TaxiBasesDFInferSchema = spark.read.option("multiline",True).option("header",True).option("inferSchema",True).json(TaxiBasesPath)

TaxiBasesDFInferSchema.show()
TaxiBasesDFInferSchema.printSchema()

+--------------------+----------+--------------------+--------------------+--------------+------------+----------------+--------+--------------------+
|             Address|      Date|         Entity Name|         GeoLocation|License Number|SHL Endorsed|Telephone Number|    Time|        Type of Base|
+--------------------+----------+--------------------+--------------------+--------------+------------+----------------+--------+--------------------+
|{636, NEW YORK, 1...|08/15/2019|         VIER-NY,LLC|{40.75273, (40.75...|        B02865|          No|      6466657536|18:03:31|      BLACK CAR BASE|
|{131, BRONX, 1046...|08/15/2019|VETERANS RADIO DI...|{40.86927, (40.86...|        B02634|          No|      7183647878|18:03:31|         LIVERY BASE|
|{115-54, ELMONT, ...|08/15/2019|      ALPHA VAN LINE|{40.693473, (40.6...|        B80094|          No|      5162850750|18:03:31|COMMUTER VAN AUTH...|
|{866, BROOKLYN, 1...|08/15/2019|A.T.B. CAR AND LI...|{40.667838, (40.6...|        B02677|    

In [34]:
TaxiBasesDFSchema = StructType([
    StructField("Address",StructType([
        StructField("Building",StringType(),True),
        StructField("City",StringType(),True),
        StructField("Postcode",LongType(),True),
        StructField("State",StringType(),True),
        StructField("Street",StringType(),True)
    ]),True),
    StructField("Date",StringType(),True),
    StructField("Entity Name",StringType(),True),
    StructField("GeoLocation",StructType([
        StructField("Latitude",DoubleType(),True),
        StructField("Location",StringType(),True),
        StructField("Longitude",DoubleType(),True)
    ])),
    StructField("License Number",StringType(),True),
    StructField("SHL Endorsed",StringType(),True),
    StructField("Telephone Number",LongType(),True),
    StructField("Time",StringType(),True),
    StructField("Type of Base",StringType(),True)
])

In [35]:
TaxiBasesDFWithSchema = spark.read.option("multiline",True).schema(TaxiBasesDFSchema).json(TaxiBasesPath)

TaxiBasesDFWithSchema.show()
TaxiBasesDFWithSchema.printSchema()

+--------------------+----------+--------------------+--------------------+--------------+------------+----------------+--------+--------------------+
|             Address|      Date|         Entity Name|         GeoLocation|License Number|SHL Endorsed|Telephone Number|    Time|        Type of Base|
+--------------------+----------+--------------------+--------------------+--------------+------------+----------------+--------+--------------------+
|{636, NEW YORK, 1...|08/15/2019|         VIER-NY,LLC|{40.75273, (40.75...|        B02865|          No|      6466657536|18:03:31|      BLACK CAR BASE|
|{131, BRONX, 1046...|08/15/2019|VETERANS RADIO DI...|{40.86927, (40.86...|        B02634|          No|      7183647878|18:03:31|         LIVERY BASE|
|{115-54, ELMONT, ...|08/15/2019|      ALPHA VAN LINE|{40.693473, (40.6...|        B80094|          No|      5162850750|18:03:31|COMMUTER VAN AUTH...|
|{866, BROOKLYN, 1...|08/15/2019|A.T.B. CAR AND LI...|{40.667838, (40.6...|        B02677|    

# Analyse Data

In [37]:
yellowTaxiDFPath = "/Users/tulasiramreddygade/Downloads/apache-spark-3-fundamentals/DataFiles/Raw/YellowTaxis_202210.csv"

yellowTaxiDF = spark.read.option("header","true").schema(yellowTaxiSchema).csv(yellowTaxiDFPath)

yellowTaxiDF.printSchema()
yellowTaxiDF.count()


root
 |-- VendorId: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



                                                                                

3675412

In [38]:
yellowTaxiDFAnalyzed = yellowTaxiDF.describe("passenger_count","trip_distance")

yellowTaxiDFAnalyzed.show()

[Stage 24:>                                                         (0 + 4) / 4]

+-------+------------------+-----------------+
|summary|   passenger_count|    trip_distance|
+-------+------------------+-----------------+
|  count|           3542392|          3675412|
|   mean|1.3846934500755421|6.206976298167358|
| stddev|0.9302303297407405|640.8236808320215|
|    min|               0.0|              0.0|
|    max|               9.0|        389678.46|
+-------+------------------+-----------------+



                                                                                

## Clean Data

### 1. Accuracy check : filter inaccurate data

In [41]:
print("Before data cleaning : "+str(yellowTaxiDF.count()))

yellowTaxiDF = yellowTaxiDF.where("passenger_count > 0").filter(col("trip_distance")>0.0)

print("After data cleaning : "+str(yellowTaxiDF.count()))

Before data cleaning : 3675412


[Stage 30:>                                                         (0 + 4) / 4]

After data cleaning : 3422296


                                                                                

### 2 a. Completeness check : drop rows with nulls 

In [43]:
print("Before operation : "+str(yellowTaxiDF.count()))

yellowTaxiDF = yellowTaxiDF.na.drop("all")

print("After operation : "+str(yellowTaxiDF.count()))

                                                                                

Before operation : 3422296


[Stage 36:>                                                         (0 + 4) / 4]

After operation : 3422296


                                                                                

### 2 b. Completeness check : fill null values with default values

In [45]:
defaultValues = {"payment_type":5,"RatecodeID":1}

yellowTaxiDF = yellowTaxiDF.na.fill(defaultValues)


### 3. Uniqueness check : Drop duplicates

In [47]:
print("Before operation : "+ str(yellowTaxiDF.count()))

yellowTaxiDF = yellowTaxiDF.dropDuplicates() # Very costly operation

print("After operation : "+ str(yellowTaxiDF.count()))

                                                                                

Before operation : 3422296


24/09/18 18:29:51 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:29:51 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:29:51 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:29:51 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:29:51 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:29:51 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:29:51 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:29:51 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:29:57 WARN RowBasedKeyValueBatch: Calling spill() on

After operation : 3422295


                                                                                

### 4. Timeline check : Remove records outside the bound

In [49]:
print("Before operation : "+str(yellowTaxiDF.count()))

yellowTaxiDF = yellowTaxiDF.where("tpep_pickup_datetime > '2022-10-01' and tpep_dropoff_datetime < '2022-11-01'")

print("After operation : "+str(yellowTaxiDF.count()))

24/09/18 18:30:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:30:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:30:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:30:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:30:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:30:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:30:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:30:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:30:05 WARN RowBasedKeyValueBatch: Calling spill() on

Before operation : 3422295


24/09/18 18:30:09 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:30:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:30:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:30:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:30:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:30:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:30:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:30:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:30:13 WARN RowBasedKeyValueBatch: Calling spill() on

After operation : 3393897


                                                                                

In [50]:
defaultValues = {"payment_type":5,"RatecodeID":1}


yellowTaxiDFPath = "/Users/tulasiramreddygade/Downloads/apache-spark-3-fundamentals/DataFiles/Raw/YellowTaxis_202210.csv"

yellowTaxiDF = spark.read.option("header","true").schema(yellowTaxiSchema).csv(yellowTaxiDFPath)

yellowTaxiDF = (yellowTaxiDF
                .where("passenger_count > 0")
                .filter(col("trip_distance")>0.0)
                .na.drop("all")
                .na.fill(defaultValues)
                .dropDuplicates()
                .where("tpep_pickup_datetime > '2022-10-01' and tpep_dropoff_datetime < '2022-11-01'")
               )

print(yellowTaxiDF.count())

24/09/18 18:30:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:30:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:30:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:30:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:30:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:30:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:30:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:30:17 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/09/18 18:30:22 WARN RowBasedKeyValueBatch: Calling spill() on

3393897


                                                                                

## Applying business specific operations

In [52]:
yellowTaxiDFPath = "/Users/tulasiramreddygade/Downloads/apache-spark-3-fundamentals/DataFiles/Raw/YellowTaxis_202210.csv"

yellowTaxiDF = spark.read.option("header","true").schema(yellowTaxiSchema).csv(yellowTaxiDFPath)

In [53]:
yellowTaxiDF.count()
yellowTaxiDF.printSchema()

root
 |-- VendorId: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



### 1. Select limited columns

In [55]:
yellowTaxiDF = (
    yellowTaxiDF.select(
        # Select only limited columns
        "VendorId",
        col("passenger_count").cast(IntegerType()),
        column("trip_distance").alias("TripDistance"),
        yellowTaxiDF.tpep_pickup_datetime,
        "tpep_dropoff_datetime",
        "PULocationID",
        "DOLocationID",
        "RatecodeID",
        "total_amount",
        "payment_type",
        "airport_fee"
    )
)

yellowTaxiDF.printSchema()
yellowTaxiDF = yellowTaxiDF.drop("airport_fee")
yellowTaxiDF.printSchema()

root
 |-- VendorId: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- TripDistance: double (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- airport_fee: double (nullable = true)

root
 |-- VendorId: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- TripDistance: double (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)



### 2. Rename columns

In [57]:
yellowTaxiDF = (
    yellowTaxiDF
    .withColumnRenamed("passenger_count","PassengerCount")
    .withColumnRenamed("tpep_pickup_datetime","PickupTime")
    .withColumnRenamed("tpep_dropoff_datetime","DropTime")
    .withColumnRenamed("PULocationID","PickupLocationId")
    .withColumnRenamed("DOLocationID","DropLocationId")
    .withColumnRenamed("total_amount","TotalAmount")
    .withColumnRenamed("payment_type","PaymentType")
)

yellowTaxiDF.printSchema()


root
 |-- VendorId: integer (nullable = true)
 |-- PassengerCount: integer (nullable = true)
 |-- TripDistance: double (nullable = true)
 |-- PickupTime: timestamp (nullable = true)
 |-- DropTime: timestamp (nullable = true)
 |-- PickupLocationId: integer (nullable = true)
 |-- DropLocationId: integer (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- TotalAmount: double (nullable = true)
 |-- PaymentType: integer (nullable = true)



### 3.a. Create derived coloumns - TripYear, TripMonth, TripDay

In [59]:
yellowTaxiDF = (
    yellowTaxiDF
    .withColumn("TripYear",year(col("PickupTime")))
    .select(
        "*",
        expr("month(PickupTime) AS TripMonth"),
        dayofmonth(col("PickupTime")).alias("TripDay")
    )
)
# any of the operation will give the same time
yellowTaxiDF.printSchema()

root
 |-- VendorId: integer (nullable = true)
 |-- PassengerCount: integer (nullable = true)
 |-- TripDistance: double (nullable = true)
 |-- PickupTime: timestamp (nullable = true)
 |-- DropTime: timestamp (nullable = true)
 |-- PickupLocationId: integer (nullable = true)
 |-- DropLocationId: integer (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- TotalAmount: double (nullable = true)
 |-- PaymentType: integer (nullable = true)
 |-- TripYear: integer (nullable = true)
 |-- TripMonth: integer (nullable = true)
 |-- TripDay: integer (nullable = true)



### 3.b. Create derived cloumn - TripTimeInMinutes

In [61]:
# option 1
yellowTaxiDF = (
    yellowTaxiDF
    .withColumn("TripTimeInMinutes",
                round(
                        (unix_timestamp(col("DropTime"))
                         -
                         unix_timestamp(col("PickupTime")))
                            / 60
                )
        )
)

yellowTaxiDF.printSchema()


root
 |-- VendorId: integer (nullable = true)
 |-- PassengerCount: integer (nullable = true)
 |-- TripDistance: double (nullable = true)
 |-- PickupTime: timestamp (nullable = true)
 |-- DropTime: timestamp (nullable = true)
 |-- PickupLocationId: integer (nullable = true)
 |-- DropLocationId: integer (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- TotalAmount: double (nullable = true)
 |-- PaymentType: integer (nullable = true)
 |-- TripYear: integer (nullable = true)
 |-- TripMonth: integer (nullable = true)
 |-- TripDay: integer (nullable = true)
 |-- TripTimeInMinutes: double (nullable = true)



In [62]:
tripTimeInMinutesExpr = (unix_timestamp(col("DropTime")) - unix_timestamp(col("PickupTime")))

# Dynamic Expressions

tripTimeInMinutesExpr = round(tripTimeInMinutesExpr/60)

yellowTaxiDF = yellowTaxiDF.withColumn("TripTimeInMinutes",tripTimeInMinutesExpr)

yellowTaxiDF.printSchema()

root
 |-- VendorId: integer (nullable = true)
 |-- PassengerCount: integer (nullable = true)
 |-- TripDistance: double (nullable = true)
 |-- PickupTime: timestamp (nullable = true)
 |-- DropTime: timestamp (nullable = true)
 |-- PickupLocationId: integer (nullable = true)
 |-- DropLocationId: integer (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- TotalAmount: double (nullable = true)
 |-- PaymentType: integer (nullable = true)
 |-- TripYear: integer (nullable = true)
 |-- TripMonth: integer (nullable = true)
 |-- TripDay: integer (nullable = true)
 |-- TripTimeInMinutes: double (nullable = true)



### 3.c. Create derived column - TripType

In [64]:
tripTypeColumnExpr = (

    when(
        col("RatecodeID") == 6,
        "SharedTrip"
    ).otherwise("SoloTrip")
)

yellowTaxiDF = (

    yellowTaxiDF
    .withColumn("TripType",tripTypeColumnExpr)
    .drop("RatecodeID")
)

yellowTaxiDF.printSchema()

root
 |-- VendorId: integer (nullable = true)
 |-- PassengerCount: integer (nullable = true)
 |-- TripDistance: double (nullable = true)
 |-- PickupTime: timestamp (nullable = true)
 |-- DropTime: timestamp (nullable = true)
 |-- PickupLocationId: integer (nullable = true)
 |-- DropLocationId: integer (nullable = true)
 |-- TotalAmount: double (nullable = true)
 |-- PaymentType: integer (nullable = true)
 |-- TripYear: integer (nullable = true)
 |-- TripMonth: integer (nullable = true)
 |-- TripDay: integer (nullable = true)
 |-- TripTimeInMinutes: double (nullable = true)
 |-- TripType: string (nullable = false)



### Check Execution Plan

In [66]:
yellowTaxiDF.explain(mode = "extended")
# other modes - simple, codefen, cost, formatted

== Parsed Logical Plan ==
Project [VendorId#1265, PassengerCount#1351, TripDistance#1327, PickupTime#1362, DropTime#1373, PickupLocationId#1384, DropLocationId#1395, TotalAmount#1406, PaymentType#1417, TripYear#1428, TripMonth#1440, TripDay#1441, TripTimeInMinutes#1470, TripType#1485]
+- Project [VendorId#1265, PassengerCount#1351, TripDistance#1327, PickupTime#1362, DropTime#1373, PickupLocationId#1384, DropLocationId#1395, RatecodeID#1270, TotalAmount#1406, PaymentType#1417, TripYear#1428, TripMonth#1440, TripDay#1441, TripTimeInMinutes#1470, CASE WHEN (RatecodeID#1270 = cast(6 as double)) THEN SharedTrip ELSE SoloTrip END AS TripType#1485]
   +- Project [VendorId#1265, PassengerCount#1351, TripDistance#1327, PickupTime#1362, DropTime#1373, PickupLocationId#1384, DropLocationId#1395, RatecodeID#1270, TotalAmount#1406, PaymentType#1417, TripYear#1428, TripMonth#1440, TripDay#1441, round((cast((unix_timestamp(DropTime#1373, yyyy-MM-dd HH:mm:ss, Some(Asia/Kolkata), false) - unix_timesta

### Extract nested fields from json

In [68]:
TaxiBasesDFWithSchema.printSchema()

root
 |-- Address: struct (nullable = true)
 |    |-- Building: string (nullable = true)
 |    |-- City: string (nullable = true)
 |    |-- Postcode: long (nullable = true)
 |    |-- State: string (nullable = true)
 |    |-- Street: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Entity Name: string (nullable = true)
 |-- GeoLocation: struct (nullable = true)
 |    |-- Latitude: double (nullable = true)
 |    |-- Location: string (nullable = true)
 |    |-- Longitude: double (nullable = true)
 |-- License Number: string (nullable = true)
 |-- SHL Endorsed: string (nullable = true)
 |-- Telephone Number: long (nullable = true)
 |-- Time: string (nullable = true)
 |-- Type of Base: string (nullable = true)



In [69]:
TaxiBasesFlatDF = TaxiBasesDFWithSchema.select(
    col("Address.Building").alias("AddressBuilding"),
    expr("Address.City as AddressCity"),
    expr("Address.Postcode as AddressPostcode"),
    col("Address.State").alias("AddressState"),
    column("Address.Street").alias("AddressStreet"),
    column("GeoLocation.Latitude").alias("GeoLocationLatitude"),
    column("GeoLocation.Location").alias("GeoLocation"),
    column("GeoLocation.Longitude").alias("GeoLocationLongitude")
).drop("Address").drop("GeoLocation")

TaxiBasesFlatDF.printSchema()

root
 |-- AddressBuilding: string (nullable = true)
 |-- AddressCity: string (nullable = true)
 |-- AddressPostcode: long (nullable = true)
 |-- AddressState: string (nullable = true)
 |-- AddressStreet: string (nullable = true)
 |-- GeoLocationLatitude: double (nullable = true)
 |-- GeoLocationLongitude: double (nullable = true)



In [70]:
yellowTaxiDF.printSchema()

root
 |-- VendorId: integer (nullable = true)
 |-- PassengerCount: integer (nullable = true)
 |-- TripDistance: double (nullable = true)
 |-- PickupTime: timestamp (nullable = true)
 |-- DropTime: timestamp (nullable = true)
 |-- PickupLocationId: integer (nullable = true)
 |-- DropLocationId: integer (nullable = true)
 |-- TotalAmount: double (nullable = true)
 |-- PaymentType: integer (nullable = true)
 |-- TripYear: integer (nullable = true)
 |-- TripMonth: integer (nullable = true)
 |-- TripDay: integer (nullable = true)
 |-- TripTimeInMinutes: double (nullable = true)
 |-- TripType: string (nullable = false)



In [71]:
yellowTaxiDFReport = (
    yellowTaxiDF
    .groupBy("PickupLocationId","DropLocationId")
    .agg(
        avg("TripTimeInMinutes").alias("AvgTripTime"),
        sum("TotalAmount").alias("SumAmount")
    )
    .orderBy(col("PickupLocationId").desc(),col("DropLocationId"))
)

yellowTaxiDFReport = yellowTaxiDFReport.select(
    "*",
    round(col("AvgTripTime")).alias("AvergageTripTime"),
    round(col("SumAmount")).alias("SumAmountRounded")
).drop("AvgTripTime").drop("SumAmount")

yellowTaxiDFReport.show()

[Stage 69:>                                                         (0 + 4) / 4]

+----------------+--------------+----------------+----------------+
|PickupLocationId|DropLocationId|AvergageTripTime|SumAmountRounded|
+----------------+--------------+----------------+----------------+
|             265|             1|            12.0|         14723.0|
|             265|             3|            34.0|           348.0|
|             265|             4|            25.0|          1154.0|
|             265|             5|            96.0|           393.0|
|             265|             6|            60.0|            36.0|
|             265|             7|            29.0|          2207.0|
|             265|             9|            21.0|          1457.0|
|             265|            10|            50.0|          2280.0|
|             265|            11|            35.0|           430.0|
|             265|            13|            34.0|           501.0|
|             265|            14|            35.0|          2228.0|
|             265|            15|            52.

                                                                                

# Exercise
1. Based on Pickup Time, add a new column with day text = Monday to Sunday
2. Based on PickupTime, add a new column with month text = January to December
3. Based on PickupTime, add a new column with value being the last day of month

### Exercice 1

In [74]:
yellowTaxiDF.printSchema()

root
 |-- VendorId: integer (nullable = true)
 |-- PassengerCount: integer (nullable = true)
 |-- TripDistance: double (nullable = true)
 |-- PickupTime: timestamp (nullable = true)
 |-- DropTime: timestamp (nullable = true)
 |-- PickupLocationId: integer (nullable = true)
 |-- DropLocationId: integer (nullable = true)
 |-- TotalAmount: double (nullable = true)
 |-- PaymentType: integer (nullable = true)
 |-- TripYear: integer (nullable = true)
 |-- TripMonth: integer (nullable = true)
 |-- TripDay: integer (nullable = true)
 |-- TripTimeInMinutes: double (nullable = true)
 |-- TripType: string (nullable = false)



In [75]:
yellowTaxiDF = (
    yellowTaxiDF.select(
        "*",
        dayname(col("PickupTime")).alias("PickupDay"),
        month(col("PickupTime")).alias("PickupMonth"),
        last_day(col("PickupTime")).alias("LastDayMonth")
    )
)

yellowTaxiDF.printSchema()

NameError: name 'dayname' is not defined