<a href="https://colab.research.google.com/github/Sakhile-Ngcobo/PySpark-Sql/blob/main/google_colab_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

[This notebook follows the tutorial for the configuration and setup of PySpark](https://www.youtube.com/watch?v=WkqFPxVgsqI)

PySpark is the Python API for Apache Spark. It enables users to perform real-time, large-scale data processing in a distributed environment using Python. PySpark combines Python’s learnability and ease of use with the power of Apache Spark to enable processing and analysis of data at any size for everyone familiar with Python. PySpark supports all of Spark’s features such as Spark SQL, DataFrames, Structured Streaming, Machine Learning (MLlib) and Spark Core.

Installing PySpark

In [None]:
!java -version

openjdk version "11.0.28" 2025-07-15
OpenJDK Runtime Environment (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1)
OpenJDK 64-Bit Server VM (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1, mixed mode, sharing)


In [None]:
!pip install pyspark



In [None]:
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, IntegerType, StringType, DoubleType,LongType, TimestampType

Working with PySpark

Download Dataset

In [None]:
!curl -O https://datarepo.eng.ucsd.edu/mcauley_group/data/amazon_2023/raw/review_categories/Automotive.jsonl.gz
!gunzip -f Automotive.jsonl.gz

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   284  100   284    0     0    914      0 --:--:-- --:--:-- --:--:--   916

gzip: Automotive.jsonl.gz: not in gzip format


In [None]:
!wget https://datarepo.eng.ucsd.edu/mcauley_group/data/amazon_2023/raw/review_categories/Automotive.jsonl.gz
!gunzip -f Automotive.jsonl.gz

--2025-09-16 07:30:21--  https://datarepo.eng.ucsd.edu/mcauley_group/data/amazon_2023/raw/review_categories/Automotive.jsonl.gz
Resolving datarepo.eng.ucsd.edu (datarepo.eng.ucsd.edu)... 132.239.8.30
Connecting to datarepo.eng.ucsd.edu (datarepo.eng.ucsd.edu)|132.239.8.30|:443... connected.
HTTP request sent, awaiting response... 404 Not Found
2025-09-16 07:30:22 ERROR 404: Not Found.


gzip: Automotive.jsonl.gz: not in gzip format


In [2]:
"""
!wget https://cseweb.ucsd.edu/~jmcauley/datasets.html
!cat datasets.html
"""

'\n!wget https://cseweb.ucsd.edu/~jmcauley/datasets.html\n!cat datasets.html\n'

In [None]:
!wget https://mcauleylab.ucsd.edu/public_datasets/data/beer/beeradvocate.json.gz
!gunzip -f beeradvocate.json.gz

--2025-09-16 07:30:23--  https://mcauleylab.ucsd.edu/public_datasets/data/beer/beeradvocate.json.gz
Resolving mcauleylab.ucsd.edu (mcauleylab.ucsd.edu)... 169.228.63.88
Connecting to mcauleylab.ucsd.edu (mcauleylab.ucsd.edu)|169.228.63.88|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 448628940 (428M) [application/gzip]
Saving to: ‘beeradvocate.json.gz’


2025-09-16 07:30:34 (40.4 MB/s) - ‘beeradvocate.json.gz’ saved [448628940/448628940]



In [None]:
print(f'The size of the Beer Advocate dataset is: {os.path.getsize("/content/beeradvocate.json") / (1024 ** 3):.2f} GB')

The size of the Beer Advocate dataset is: 1.54 GB


*Initializing a SparkSession*

A SparkSession is a class from the PySpark module. It is the entrypoint to working with Apache Spark. This will allow us to create a PySpark DataFrame



In [None]:
spark =SparkSession.builder.appName('BeerData').getOrCreate()
print(f'The SPark version is {spark.version}')

The SPark version is 3.5.1


Reading in the Data

Pandas

In [None]:
'''
pandas_df = pd.read_json('/content/beeradvocate.json', lines=True)
pandas_df.head()
'''
#Data too large to be read in using pandas, session crashed

"\npandas_df = pd.read_json('/content/beeradvocate.json', lines=True)\npandas_df.head()\n"

PySpark

Using a Schema for a PySpark DataFrame
Within Apache Spark a schema provides the data format for a DataFrame or a Dataset.

For our schema, we will first set our StructType which is a set of StructFields. A StructField contains information for a given column including the column name (name), the type of data within the rows (dataType) and whether or not the row can contain a null value (nullable).

In [None]:
!head /content/beeradvocate.json

{'beer/name': 'Sausa Weizen', 'beer/beerId': '47986', 'beer/brewerId': '10325', 'beer/ABV': '5.00', 'beer/style': 'Hefeweizen', 'review/appearance': '2.5', 'review/aroma': '2', 'review/palate': '1.5', 'review/taste': '1.5', 'review/overall': '1.5', 'review/time': '1234817823', 'review/profileName': 'stcules', 'review/text': 'A lot of foam. But a lot.\tIn the smell some banana, and then lactic and tart. Not a good start.\tQuite dark orange in color, with a lively carbonation (now visible, under the foam).\tAgain tending to lactic sourness.\tSame for the taste. With some yeast and banana.'}
{'beer/name': 'Red Moon', 'beer/beerId': '48213', 'beer/brewerId': '10325', 'beer/ABV': '6.20', 'beer/style': 'English Strong Ale', 'review/appearance': '3', 'review/aroma': '2.5', 'review/palate': '3', 'review/taste': '3', 'review/overall': '3', 'review/time': '1235915097', 'review/profileName': 'stcules', 'review/text': 'Dark red color, light beige foam, average.\tIn the smell malt and caramel, not 

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType

schema = StructType([
    StructField("beer/name", StringType(), True),
    StructField("beer/beerId", StringType(), True),
    StructField("beer/brewerId", StringType(), True),
    StructField("beer/ABV", StringType(), True), # Keeping as StringType initially as it might contain non-numeric values
    StructField("beer/style", StringType(), True),
    StructField("review/appearance", StringType(), True), # Keeping as StringType initially
    StructField("review/aroma", StringType(), True), # Keeping as StringType initially
    StructField("review/palate", StringType(), True), # Keeping as StringType initially
    StructField("review/taste", StringType(), True), # Keeping as StringType initially
    StructField("review/overall", StringType(), True), # Keeping as StringType initially
    StructField("review/time", LongType(), True), # unix timestamp
    StructField("review/profileName", StringType(), True),
    StructField("review/text", StringType(), True)
])

In [None]:
spark_df = spark.read.schema(schema).json('/content/beeradvocate.json')

PySpark Operations

In [None]:
spark_df.show(5)

+--------------------+-----------+-------------+--------+--------------------+-----------------+------------+-------------+------------+--------------+-----------+------------------+--------------------+
|           beer/name|beer/beerId|beer/brewerId|beer/ABV|          beer/style|review/appearance|review/aroma|review/palate|review/taste|review/overall|review/time|review/profileName|         review/text|
+--------------------+-----------+-------------+--------+--------------------+-----------------+------------+-------------+------------+--------------+-----------+------------------+--------------------+
|        Sausa Weizen|      47986|        10325|    5.00|          Hefeweizen|              2.5|           2|          1.5|         1.5|           1.5|       NULL|           stcules|A lot of foam. Bu...|
|            Red Moon|      48213|        10325|    6.20|  English Strong Ale|                3|         2.5|            3|           3|             3|       NULL|           stcules|Da

In [None]:
spark_df.printSchema()

root
 |-- beer/name: string (nullable = true)
 |-- beer/beerId: string (nullable = true)
 |-- beer/brewerId: string (nullable = true)
 |-- beer/ABV: string (nullable = true)
 |-- beer/style: string (nullable = true)
 |-- review/appearance: string (nullable = true)
 |-- review/aroma: string (nullable = true)
 |-- review/palate: string (nullable = true)
 |-- review/taste: string (nullable = true)
 |-- review/overall: string (nullable = true)
 |-- review/time: long (nullable = true)
 |-- review/profileName: string (nullable = true)
 |-- review/text: string (nullable = true)



In [None]:
spark_df.describe().show()

+-------+------------------------+-----------------+-----------------+------------------+----------+------------------+------------------+------------------+------------------+------------------+-----------+--------------------+--------------------+
|summary|               beer/name|      beer/beerId|    beer/brewerId|          beer/ABV|beer/style| review/appearance|      review/aroma|     review/palate|      review/taste|    review/overall|review/time|  review/profileName|         review/text|
+-------+------------------------+-----------------+-----------------+------------------+----------+------------------+------------------+------------------+------------------+------------------+-----------+--------------------+--------------------+
|  count|                 1585815|          1585815|          1585815|           1585815|   1585815|           1585815|           1585815|           1585815|           1585815|           1585815|          0|             1585815|             1585815|


In [None]:
spark_df.groupby('review/taste').count().show()

+------------+------+
|review/taste| count|
+------------+------+
|           3|166726|
|         4.5|336045|
|        NULL|   800|
|         2.5| 66488|
|           5| 83956|
|         3.5|324332|
|         1.5| 15115|
|           1|  9984|
|           4|541201|
|           2| 41968|
+------------+------+



[PySpark documentation](https://spark.apache.org/docs/latest/api/python/index.html)

Additional resource

A Guide to Spark SQL: Querying Big Data with SQL-Like Syntax
Apache Spark SQL is one of the very strong modules of Apache Spark, supporting the execution of SQL queries on huge datasets. Using SQL syntax similar to older versions, Spark SQL allows analysts, scientists, and engineers to handle data transformations, aggregation, and manipulations in a more fluid manner within the framework of Spark. This tutorial will walk through the basic requirements of Spark SQL, starting with the creation of a Spark SQL session and continuing through the execution of several commonly used operations. Code is shown for each step to help you easily learn on your own.

Introduction to Spark SQL
Spark SQL is used to execute SQL queries over structured data inside of Spark. Among other fine features, it provides a consistent interface for various data manipulation and querying formats, be it JSON, Parquet, ORC, and many more. Catalyst Optimizer is one of the key advantages of Spark SQL which, in turn makes query executions much faster - really helpful when executed on large data sets.

In [None]:
from google.colab import drive
drive.mount('/mntDrive') #, force_remount=True)

Mounted at /mntDrive


Setting Up a Spark SQL Session
Before running any queries, we need to set up a Spark session, which is the entry point for any Spark functionality.

In [None]:
from pyspark.sql import SparkSession
# Initialize Spark session
spark_example = SparkSession.builder.appName("Spark SQL Example").getOrCreate()

In [None]:
spark_example

Working with DataFrames and Temporary Views

In [None]:
# Create database tables from CSV files
# Remove spaces from column names in NYC-BikeShare-2015-2017-combined-Copy.csv
# Upload to google drive and rename to Dataset then copy the path and paste below
bike_share_path = "/mntDrive/MyDrive/DataEngineering245-2025/NYC-BikeShare-2015-2017-combined-Copy.csv"
bike_specs_path = "/mntDrive/MyDrive/DataEngineering245-2025/bikespecs.csv"
maker_details_path = "/mntDrive/MyDrive/DataEngineering245-2025/makerdetails.csv"
#df = pd.read_csv(path, encoding='unicode_escape', index_col=0)

In [None]:
# Load data into a DataFrame
bike_share_df = spark_example.read.csv(bike_share_path, header=True, inferSchema=True)
bike_specs_df = spark_example.read.csv(bike_specs_path, header=True, inferSchema=True)
maker_details_df = spark_example.read.csv(maker_details_path, header=True, inferSchema=True)


SQL temporary views are virtual tables that exist only for the duration of a specific database session. They provide a way to simplify complex queries and can be useful for organizing and manipulating data without creating permanent objects.

In [None]:
# Register DataFrame as a SQL temporary view
bike_share_df.createOrReplaceTempView("bike_share")
bike_specs_df.createOrReplaceTempView("bike_specs")
maker_details_df.createOrReplaceTempView("maker_details")

In [None]:
bike_share_df.show(5)

+---+------------+-------------------+-------------------+--------------+----------------+--------------------+---------------------+------------+------------------+------------------+-------------------+------+----------+---------+------+--------------------+
|_c0|TripDuration|          StartTime|           StopTime|StartStationID|StartStationName|StartStationLatitude|StartStationLongitude|EndStationID|    EndStationName|EndStationLatitude|EndStationLongitude|BikeID|  UserType|BirthYear|Gender|Trip_Duration_in_min|
+---+------------+-------------------+-------------------+--------------+----------------+--------------------+---------------------+------------+------------------+------------------+-------------------+------+----------+---------+------+--------------------+
|  0|         376|2015-10-01 00:16:26|2015-10-01 00:22:42|          3212| Christ Hospital|        40.734785818|        -74.050443636|        3207|       Oakland Ave|        40.7376037| -74.05247829999999| 24470|Subscr

In [None]:
bike_specs_df.show(5)

+---+------------+-------------------+-------------------+--------------+----------------+--------------------+---------------------+------------+------------------+------------------+-------------------+------+----------+---------+------+--------------------+
|_c0|TripDuration|          StartTime|           StopTime|StartStationID|StartStationName|StartStationLatitude|StartStationLongitude|EndStationID|    EndStationName|EndStationLatitude|EndStationLongitude|BikeID|  UserType|BirthYear|Gender|Trip_Duration_in_min|
+---+------------+-------------------+-------------------+--------------+----------------+--------------------+---------------------+------------+------------------+------------------+-------------------+------+----------+---------+------+--------------------+
|  0|         376|2015-10-01 00:16:26|2015-10-01 00:22:42|          3212| Christ Hospital|        40.734785818|        -74.050443636|        3207|       Oakland Ave|        40.7376037| -74.05247829999999| 24470|Subscr

In [None]:
maker_details_df.show(5)

+---+------------+-------------------+-------------------+--------------+----------------+--------------------+---------------------+------------+------------------+------------------+-------------------+------+----------+---------+------+--------------------+
|_c0|TripDuration|          StartTime|           StopTime|StartStationID|StartStationName|StartStationLatitude|StartStationLongitude|EndStationID|    EndStationName|EndStationLatitude|EndStationLongitude|BikeID|  UserType|BirthYear|Gender|Trip_Duration_in_min|
+---+------------+-------------------+-------------------+--------------+----------------+--------------------+---------------------+------------+------------------+------------------+-------------------+------+----------+---------+------+--------------------+
|  0|         376|2015-10-01 00:16:26|2015-10-01 00:22:42|          3212| Christ Hospital|        40.734785818|        -74.050443636|        3207|       Oakland Ave|        40.7376037| -74.05247829999999| 24470|Subscr

1. Find the station names of the source (start) and destination (end) of all trips with a duration of LESS than 5 minutes. Sort this list in ascending order of source station name. What is the destination station name of the third trip in this sorted list?

In [None]:
# Selecting specific columns
result_df = spark_example.sql(
"""SELECT StartStationName, EndStationName
FROM bike_share
WHERE Trip_Duration_in_min < 5
ORDER BY StartStationName ASC
limit 3;""")
result_df.show()

+-----------------+--------------+
| StartStationName|EndStationName|
+-----------------+--------------+
|5 Corners Library|       Sip Ave|
|5 Corners Library|       Sip Ave|
|5 Corners Library|       Sip Ave|
+-----------------+--------------+



2. Find the station names of the source (start) and trip duration in minutes of all trips with a duration of between 20 and 30 minutes (inclusive). Sort this list in descending order of trip duration. Where did the trip at the top of this sorted list start?

In [None]:

result_df = spark_example.sql(
"""SELECT StartStationName, Trip_Duration_in_min
FROM bike_share
WHERE Trip_Duration_in_min BETWEEN 20 AND 30
ORDER BY Trip_Duration_in_min DESC
limit 1;""")
result_df.show()

+----------------+--------------------+
|StartStationName|Trip_Duration_in_min|
+----------------+--------------------+
|  Van Vorst Park|                  30|
+----------------+--------------------+



3. How old (as of 1 January 2024) is the oldest male (gender of 0) person that rented a bike with Aluminum1 frame to stations with idS 3212 or 3190?

In [None]:

result_df = spark_example.sql(
"""SELECT MAX(2024 -BirthYear) as Oldest_Age FROM bike_share
LEFT JOIN bike_specs
ON bike_share.BikeID = bike_specs.bike_ID
WHERE EndStationID IN (3212, 3190) AND frame_material = 'Aluminum' and Gender = 0
;""")
result_df.show()

+----------+
|Oldest_Age|
+----------+
|      51.0|
+----------+



4. Which city has the highest average age of carbon bike manufacturers?

In [None]:

result_df = spark_example.sql(
"""SELECT city, avg(2024-year_established) as Average_Age
FROM maker_details
LEFT JOIN bike_specs
ON maker_details.maker_ID = bike_specs.manufacture_ID
WHERE frame_material = 'Carbon'
GROUP BY city
ORDER BY Average_Age DESC
LIMIT 1;""")
result_df.show()

+-----------+-----------------+
|       city|      Average_Age|
+-----------+-----------------+
|Jersey City|37.06371681415929|
+-----------+-----------------+



5. How many bikes with shocks and less than 15 gears were rented by 30-year-old (as of 1 January 2024) customers?

In [None]:

result_df = spark_example.sql(
"""SELECT COUNT(*) AS NumOfBikes
FROM bike_share
LEFT JOIN bike_specs
ON bike_share.BikeID = bike_specs.bike_ID
WHERE shocks = 'yes'
AND gear_count < 15
AND (2024 - BirthYear) = 30;
""")
result_df.show()


+----------+
|NumOfBikes|
+----------+
|       386|
+----------+

