# Data discovery: Load and simple query 
## Lab workflow

<img src="../Img/Simple-workflow-1.png" width=auto; height="400">


Download the dataset from [TLC Trip Record Data](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page)


In [1]:
# Import library PySpark and use SparkSession
from pyspark.sql import SparkSession

## Create SparkSession 
- What is a **SparkSession** - It is a representation or working instance of a Spark Application that is used to create and manage data processing in the system.
- What is **master("local[1]")** - defines whether to use local mode to run with only 1 execution thread.
- What is **appName("spark")** - Defines the name of the Spark Application.
- What is **getOrCreate()** - It's the method used to invoke or create a SparkSession.

In [2]:
# Create SparkSession
spark = SparkSession.builder\
             .master("local[1]")\
             .appName("spark")\
             .getOrCreate()

In [3]:
# Load file
local_file = '../Sources/yellow_tripdata_2023-01.parquet'

# Show data from parquet file
spark.read.parquet(local_file).show(truncate = False)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|2       |2023-01-01 06:02:10 |2023-01-01 06:10:36  |1.0            |0.97         |1.0       |N                 |161         |141         |2           |9.3        |1.0  |0.5    |0.0      

In [5]:
# Read taxi data
local_file = '../Sources/yellow_tripdata_2023-01.parquet'
df = spark.read.parquet(local_file)

In [5]:
# DF is like a relation table in **memory** . Let's see the columns
df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [6]:
df.show(n=5, truncate=False, vertical=True)

-RECORD 0------------------------------------
 VendorID              | 2                   
 tpep_pickup_datetime  | 2023-01-01 06:02:10 
 tpep_dropoff_datetime | 2023-01-01 06:10:36 
 passenger_count       | 1.0                 
 trip_distance         | 0.97                
 RatecodeID            | 1.0                 
 store_and_fwd_flag    | N                   
 PULocationID          | 161                 
 DOLocationID          | 141                 
 payment_type          | 2                   
 fare_amount           | 9.3                 
 extra                 | 1.0                 
 mta_tax               | 0.5                 
 tip_amount            | 0.0                 
 tolls_amount          | 0.0                 
 improvement_surcharge | 1.0                 
 total_amount          | 14.3                
 congestion_surcharge  | 2.5                 
 airport_fee           | 0.0                 
-RECORD 1------------------------------------
 VendorID              | 2        

In [11]:
df.limit(1000).toPandas()
print(pandas_df.to_string())

TypeError: Casting to unit-less dtype 'datetime64' is not supported. Pass e.g. 'datetime64[ns]' instead.

In [7]:
from pyspark.sql import functions as F

In [8]:
from pyspark.sql import functions as F

# Select specific columns and specify data types
columns_to_show = [
    "VendorID",
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
    "passenger_count",
    "trip_distance",
    "total_amount"
]

# Read the data with specific column types
df = spark.read.parquet(local_file).select(
    F.col("VendorID").cast("integer"),
    F.col("tpep_pickup_datetime").cast("string"),  # Cast to string instead of timestamp
    F.col("tpep_dropoff_datetime").cast("string"), # Cast to string instead of timestamp
    F.col("passenger_count").cast("float"),
    F.col("trip_distance").cast("float"),
    F.col("total_amount").cast("float")
)

# Convert to Pandas DataFrame
pandas_df = df.limit(1000).toPandas()

# Display the data
print(pandas_df.to_string(index=False))

 VendorID tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  trip_distance  total_amount
        2  2023-01-01 06:02:10   2023-01-01 06:10:36              1.0       0.970000     14.300000
        2  2023-01-01 06:25:08   2023-01-01 06:31:27              1.0       1.100000     16.900000
        2  2023-01-01 05:55:04   2023-01-01 06:07:49              1.0       2.510000     34.900002
        1  2023-01-01 05:33:48   2023-01-01 05:43:25              0.0       1.900000     20.850000
        2  2023-01-01 05:40:29   2023-01-01 05:51:19              1.0       1.430000     19.680000
        2  2023-01-01 06:20:34   2023-01-01 06:32:52              1.0       1.840000     27.799999
        2  2023-01-01 05:39:22   2023-01-01 05:49:49              1.0       1.660000     20.520000
        2  2023-01-01 05:57:12   2023-01-01 06:19:56              1.0      11.700000     64.440002
        2  2023-01-01 05:51:44   2023-01-01 06:06:40              1.0       2.950000     28.379999
        2 

In [11]:
# Query sample from SQL to Spark query:
# select VendorID, total_amount from df
# where total_amount > 1;
# ---------------------------------------
# And save result to df2

df2 = df.select('VendorID','total_amount').where('total_amount > 1')

In [7]:
# Query sample from SQL to Spark query:
# select VendorID, total_amount from df
# where total_amount > 1
# limit 5;

df.select('VendorID','total_amount').where('total_amount > 1').show(n=5)

+--------+------------+
|VendorID|total_amount|
+--------+------------+
|       2|        14.3|
|       2|        16.9|
|       2|        34.9|
|       1|       20.85|
|       2|       19.68|
+--------+------------+
only showing top 5 rows



In [8]:
# Create Temporary View for SQL query 

df.createOrReplaceTempView('yellow_taxis')

In [9]:
# SQL Statement

spark.sql('select VendorID, tpep_pickup_datetime, passenger_count from yellow_taxis where total_amount > 1 and passenger_count > 2').show(n=5)

+--------+--------------------+---------------+
|VendorID|tpep_pickup_datetime|passenger_count|
+--------+--------------------+---------------+
|       1| 2023-01-01 06:13:37|            4.0|
|       1| 2023-01-01 05:33:36|            3.0|
|       1| 2023-01-01 05:51:49|            4.0|
|       2| 2023-01-01 05:57:16|            4.0|
|       2| 2023-01-01 05:45:13|            5.0|
+--------+--------------------+---------------+
only showing top 5 rows



In [10]:
# **Important** Stop the session

spark.stop()

In [6]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udf


### Q.1 "What is the average trip distance and fare amount for trips that last more than 30 minutes, occur during peak hours (7-9 AM and 4-7 PM), and have a passenger count of 2 or more? Break this down by payment type and compare weekdays vs. weekends."

In [7]:
def is_peak_hour(hour):
    return (hour >= 7 and hour < 9) or (hour >= 16 and hour < 19)

In [8]:
is_peak_hour_udf = F.udf(is_peak_hour, returnType=BooleanType())

In [9]:
df_processed = df.withColumn("trip_duration_minutes", 
                             (F.col("tpep_dropoff_datetime").cast("long") - F.col("tpep_pickup_datetime").cast("long")) / 60) \
                 .withColumn("is_peak_hour", is_peak_hour_udf(F.hour("tpep_pickup_datetime"))) \
                 .withColumn("is_weekend", F.dayofweek("tpep_pickup_datetime").isin([1, 7]))


In [10]:
df_processed.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+---------------------+------------+----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|trip_duration_minutes|is_peak_hour|is_weekend|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+---------------------+------------+----------+
|       2| 2023-01-01 06:02:10|  2023-01-01 06:10

In [11]:
# 3. Apply filters
df_filtered = df_processed.filter(
    (F.col("trip_duration_minutes") > 30) &
    (F.col("is_peak_hour") == True) &
    (F.col("passenger_count") >= 2)
)

# 4. Group and aggregate
result = df_filtered.groupBy("payment_type", "is_weekend") \
    .agg(
        F.avg("trip_distance").alias("avg_trip_distance"),
        F.avg("fare_amount").alias("avg_fare_amount"),
        F.count("*").alias("trip_count")
    )

# 5. Show results
result.orderBy("payment_type", "is_weekend").show()

+------------+----------+------------------+------------------+----------+
|payment_type|is_weekend| avg_trip_distance|   avg_fare_amount|trip_count|
+------------+----------+------------------+------------------+----------+
|           1|     false|13.919990441293626| 59.31299346821703|      6277|
|           1|      true|14.595430784123929| 61.30943852855761|      2066|
|           2|     false| 13.86736184526671| 58.84147044690057|      2081|
|           2|      true|14.931587030716711|61.409385665529015|       586|
|           3|     false|16.580714285714286| 34.08214285714286|        28|
|           3|      true| 9.144444444444444| 26.11111111111111|         9|
|           4|     false|            14.218| 8.373333333333333|        60|
|           4|      true|16.765555555555554|15.100000000000001|        18|
+------------+----------+------------------+------------------+----------+

