In [1]:
!pip install pyspark==3.2.2
!pip install delta-spark

Collecting pyspark==3.2.2
  Downloading pyspark-3.2.2.tar.gz (281.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.5/281.5 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5 (from pyspark==3.2.2)
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m15.8 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.2-py2.py3-none-any.whl size=281969432 sha256=13eacd275f566398c68927d5bc84b252116bf88aae5f98e55997d556426872e5
  Stored in directory: /root/.cache/pip/wheels/99/2c/e7/e06690607c5342affbc132b79732a3a321c83eec58d5617c38
Successfully built pyspark
Installing collected packages: py4j, pyspark
 

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from delta import *

builder = SparkSession.builder.appName("TrafficApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.0.0")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

path = '/content/raw_data'

traffic_df = spark.read.csv(path, header=True, inferSchema=True)

traffic_df.show()

+--------------+-------------------+----+----------+----+---------+-----------+------------------+--------------------+---------+---------+------------------------+----------------------+-------+--------+---------+----------+--------------+-----------------+------------+--------------------------+--------------+-----------------+----+-----------------+-----------------+-------------------------+----------------------------+-----------------------+-----------------------+--------+------------------+
|count_point_id|direction_of_travel|year|count_date|hour|region_id|region_name|local_authority_id|local_authority_name|road_name|road_type|start_junction_road_name|end_junction_road_name|easting|northing| latitude| longitude|link_length_km|link_length_miles|pedal_cycles|two_wheeled_motor_vehicles|cars_and_taxis|buses_and_coaches|lgvs|hgvs_2_rigid_axle|hgvs_3_rigid_axle|hgvs_4_or_more_rigid_axle|hgvs_3_or_4_articulated_axle|hgvs_5_articulated_axle|hgvs_6_articulated_axle|all_hgvs|all_motor_ve

In [4]:
# Writing the DataFrame to Parquet format
parquet_path = '/content/transformed_data/data.parquet'
traffic_df.write.parquet(parquet_path)

# Reading the Parquet file back into a DataFrame
parquet_df = spark.read.parquet(parquet_path)
parquet_df.show()

+--------------+-------------------+----+----------+----+---------+-----------+------------------+--------------------+---------+---------+------------------------+----------------------+-------+--------+---------+----------+--------------+-----------------+------------+--------------------------+--------------+-----------------+----+-----------------+-----------------+-------------------------+----------------------------+-----------------------+-----------------------+--------+------------------+
|count_point_id|direction_of_travel|year|count_date|hour|region_id|region_name|local_authority_id|local_authority_name|road_name|road_type|start_junction_road_name|end_junction_road_name|easting|northing| latitude| longitude|link_length_km|link_length_miles|pedal_cycles|two_wheeled_motor_vehicles|cars_and_taxis|buses_and_coaches|lgvs|hgvs_2_rigid_axle|hgvs_3_rigid_axle|hgvs_4_or_more_rigid_axle|hgvs_3_or_4_articulated_axle|hgvs_5_articulated_axle|hgvs_6_articulated_axle|all_hgvs|all_motor_ve

In [5]:
# Write the DataFrame to Delta format
delta_path = '/content/raw_data/data.delta'
traffic_df.write.format("delta").save(delta_path)

# Read the Delta file back into a DataFrame
delta_df = spark.read.format("delta").load(delta_path)
delta_df.show()

+--------------+-------------------+----+----------+----+---------+-----------+------------------+--------------------+---------+---------+------------------------+----------------------+-------+--------+---------+----------+--------------+-----------------+------------+--------------------------+--------------+-----------------+----+-----------------+-----------------+-------------------------+----------------------------+-----------------------+-----------------------+--------+------------------+
|count_point_id|direction_of_travel|year|count_date|hour|region_id|region_name|local_authority_id|local_authority_name|road_name|road_type|start_junction_road_name|end_junction_road_name|easting|northing| latitude| longitude|link_length_km|link_length_miles|pedal_cycles|two_wheeled_motor_vehicles|cars_and_taxis|buses_and_coaches|lgvs|hgvs_2_rigid_axle|hgvs_3_rigid_axle|hgvs_4_or_more_rigid_axle|hgvs_3_or_4_articulated_axle|hgvs_5_articulated_axle|hgvs_6_articulated_axle|all_hgvs|all_motor_ve

In [6]:
# Check the history of the Delta table
delta_table_history = spark.sql(f"DESCRIBE HISTORY delta.`{delta_path}`")
delta_table_history.show(truncate=False)

+-------+-----------------------+------+--------+---------+------------------------------------------+----+--------+---------+-----------+--------------+-------------+--------------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation|operationParameters                       |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                              |userMetadata|engineInfo                         |
+-------+-----------------------+------+--------+---------+------------------------------------------+----+--------+---------+-----------+--------------+-------------+--------------------------------------------------------------+------------+-----------------------------------+
|0      |2024-09-27 05:14:23.552|NULL  |NULL    |WRITE    |{mode -> ErrorIfExists, partitionBy -> []}|NULL|NULL    |NULL     |NULL       |Serializable  |true   

In [7]:
# Vacuum the Delta table to remove old files
spark.sql(f"VACUUM delta.`{delta_path}` RETAIN 168 HOURS")

DataFrame[path: string]

In [8]:
# Check the history of the Delta table
delta_table_history = spark.sql(f"DESCRIBE HISTORY delta.`{delta_path}`")
delta_table_history.show(truncate=False)

+-------+-----------------------+------+--------+------------+-----------------------------------------------------------------------------------------------------------+----+--------+---------+-----------+-----------------+-------------+--------------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation   |operationParameters                                                                                        |job |notebook|clusterId|readVersion|isolationLevel   |isBlindAppend|operationMetrics                                              |userMetadata|engineInfo                         |
+-------+-----------------------+------+--------+------------+-----------------------------------------------------------------------------------------------------------+----+--------+---------+-----------+-----------------+-------------+------------------------------------------------------------

In [10]:
# 1. How can you filter the data to show only records from the year 2019?

records_2019 = delta_df.filter(delta_df["year"] == 2019)

records_2019.show(5)

+--------------+-------------------+----+----------+----+---------+-----------+------------------+--------------------+---------+---------+------------------------+----------------------+-------+--------+---------+----------+--------------+-----------------+------------+--------------------------+--------------+-----------------+----+-----------------+-----------------+-------------------------+----------------------------+-----------------------+-----------------------+--------+------------------+
|count_point_id|direction_of_travel|year|count_date|hour|region_id|region_name|local_authority_id|local_authority_name|road_name|road_type|start_junction_road_name|end_junction_road_name|easting|northing| latitude| longitude|link_length_km|link_length_miles|pedal_cycles|two_wheeled_motor_vehicles|cars_and_taxis|buses_and_coaches|lgvs|hgvs_2_rigid_axle|hgvs_3_rigid_axle|hgvs_4_or_more_rigid_axle|hgvs_3_or_4_articulated_axle|hgvs_5_articulated_axle|hgvs_6_articulated_axle|all_hgvs|all_motor_ve

In [11]:
# 2. How can you calculate the total number of vehicles (all motor vehicles) for each road?
# Group by road name and sum the 'all_motor_vehicles' column
total_vehicles_per_road = delta_df.groupBy("road_name").sum("all_motor_vehicles")

total_vehicles_per_road.show(5)

+---------+-----------------------+
|road_name|sum(all_motor_vehicles)|
+---------+-----------------------+
|     A420|                  11772|
|       M4|                  52850|
|     A380|                  36810|
|     A371|                   4956|
|     A388|                   3591|
+---------+-----------------------+
only showing top 5 rows



In [13]:
# 3. How can you count the number of rows where the number of cars and taxis is greater than 20?
# Filter rows where 'cars_and_taxis' > 20 and count the number of such rows
cars_taxis_gt_20 = delta_df.filter(delta_df["cars_and_taxis"] > 20).count()

print(f"Number of rows where cars and taxis > 20: {cars_taxis_gt_20}")

Number of rows where cars and taxis > 20: 417


In [14]:
# 4. How can you find the average number of pedal cycles per region?

avg_pedal_cycles_per_region = delta_df.groupBy("region_name").avg("pedal_cycles")
avg_pedal_cycles_per_region.show(5)

+-----------+------------------+
|region_name| avg(pedal_cycles)|
+-----------+------------------+
| South West|2.9866369710467704|
+-----------+------------------+



In [15]:
# 5. How can you sort the data by the number of heavy goods vehicles (all_hgvs) in descending order?

sorted_hgvs = delta_df.orderBy(delta_df["all_hgvs"].desc())

sorted_hgvs.show(5)

+--------------+-------------------+----+----------+----+---------+-----------+------------------+--------------------+---------+---------+------------------------+----------------------+-------+--------+---------+---------+--------------+-----------------+------------+--------------------------+--------------+-----------------+----+-----------------+-----------------+-------------------------+----------------------------+-----------------------+-----------------------+--------+------------------+
|count_point_id|direction_of_travel|year|count_date|hour|region_id|region_name|local_authority_id|local_authority_name|road_name|road_type|start_junction_road_name|end_junction_road_name|easting|northing| latitude|longitude|link_length_km|link_length_miles|pedal_cycles|two_wheeled_motor_vehicles|cars_and_taxis|buses_and_coaches|lgvs|hgvs_2_rigid_axle|hgvs_3_rigid_axle|hgvs_4_or_more_rigid_axle|hgvs_3_or_4_articulated_axle|hgvs_5_articulated_axle|hgvs_6_articulated_axle|all_hgvs|all_motor_vehi

In [16]:
delta_df.count()

449