In [11]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

In [6]:
# Question 1
# Create a session
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

# Call the version
spark.version

'3.3.2'

In [19]:
# Question 2 

# Create Schema
fhv_schema = types.StructType([
    types.StructField("dispatching_base_num", types.StringType(), True),
    types.StructField("pickup_datetime", types.TimestampType(), True),
    types.StructField("dropoff_datetime", types.TimestampType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("SR_Flag", types.StringType(), True),
    types.StructField("Affiliated_base_number", types.StringType(), True)   
])

# Read in data
df = spark.read \
    .option("header", "true") \
    .schema(fhv_schema) \
    .csv('/home/antihaddock/Repos/data-engineering-zoomcamp/data/fhvhv_tripdata_2021-06.csv.gz')

df.show()

output_path = f'/home/antihaddock/Repos/data-engineering-zoomcamp/data/spark/'

df \
  .repartition(12) \
  .write.parquet(output_path, mode='overwrite')

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|                B02764|
|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         240|         127|      N|                B02764|
|              B02764|2021-06-01 00:46:08|2021-06-01 00:53:45|         127|         235|      N|                B02764|
|              B02510|2021-06-01 00:45:42|2021-06-01 01:03:33|         144|         146|      N|                  null|
|              B02510|2021-06-01 00:18:1

[Stage 15:>                                                         (0 + 1) / 1]

23/02/26 21:13:24 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/02/26 21:13:24 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/02/26 21:13:24 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 75.08% for 9 writers
23/02/26 21:13:24 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 67.58% for 10 writers
23/02/26 21:13:24 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 61.43% for 11 writers
23/02/26 21:13:24 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 56.31% for 12 writers


[Stage 17:>                                                       (0 + 12) / 12]

23/02/26 21:13:26 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 61.43% for 11 writers
23/02/26 21:13:26 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 67.58% for 10 writers
23/02/26 21:13:26 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 75.08% for 9 writers
23/02/26 21:13:26 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/02/26 21:13:26 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers


                                                                                

In [24]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|                B02764|
|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         240|         127|      N|                B02764|
|              B02764|2021-06-01 00:46:08|2021-06-01 00:53:45|         127|         235|      N|                B02764|
|              B02510|2021-06-01 00:45:42|2021-06-01 01:03:33|         144|         146|      N|                  null|
|              B02510|2021-06-01 00:18:1

In [28]:
# Question 3

df_data = df.registerTempTable('trips_data')
spark.sql(
    """
    SELECT
    COUNT(pickup_datetime)
    FROM
    trips_data
    WHERE
    pickup_datetime >= '2021-06-15 00:00:00'
    AND pickup_datetime <= '2021-06-15 23:59:59'
    """
).show()

[Stage 23:>                                                         (0 + 1) / 1]

+----------------------+
|count(pickup_datetime)|
+----------------------+
|                452470|
+----------------------+



                                                                                

In [51]:
# Question 4
from pyspark.sql.functions import unix_timestamp, max
df = df.withColumn('start_timestamp', unix_timestamp('pickup_datetime'))
df = df.withColumn('end_timestamp', unix_timestamp('dropoff_datetime'))
df = df.withColumn('time_diff', df.end_timestamp - df.start_timestamp)

# convert time difference to hours
df = df.withColumn('time_diff_hours', df.time_diff / 3600)


max_diff = df.agg(max('time_diff_hours')).collect()[0][0]
print("Maximum time difference:", max_diff, "hours")

[Stage 49:>                                                         (0 + 1) / 1]

Maximum time difference: 66.8788888888889 seconds


                                                                                

In [56]:
# Question 6

df_data = df.registerTempTable('trips_data')

spark.sql(
    """
    SELECT
    PULocationID,
    COUNT(PULocationID) as total
    FROM
    trips_data
    WHERE
   GROUP BY 1
   ORDER BY 2 DESC
    """
).show()

[Stage 61:>                                                         (0 + 1) / 1]

+------------+------+
|PULocationID| total|
+------------+------+
|          61|231279|
|          79|221244|
|         132|188867|
|          37|187929|
|          76|186780|
|         231|164344|
|         138|161596|
|         234|158937|
|         249|154698|
|           7|152493|
|         148|151020|
|          68|147673|
|          42|146402|
|         255|143683|
|         181|143594|
|         225|141427|
|          48|139611|
|         246|139431|
|          17|138428|
|         170|137879|
+------------+------+
only showing top 20 rows



                                                                                