# Spark Cluster

## Setup


In [1]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import col, asc, desc, max, hour, avg, date_format, rank
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.types import TimestampType, DateType
from pyspark.sql.window import Window

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1762986994807_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
spark = SparkSession.builder.getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
%%configure -f
{
    "conf": {
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type":"native",
        "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1762986994807_0002,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1762986994807_0002,pyspark,idle,Link,Link,,✔


In [4]:
sc.install_pypi_package("pandas")
sc.install_pypi_package("numpy")
sc.install_pypi_package("matplotlib")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting tzdata>=2022.7
  Downloading tzdata-2025.2-py2.py3-none-any.whl (347 kB)
Collecting numpy>=1.22.4
  Downloading numpy-2.0.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (19.5 MB)
Collecting python-dateutil>=2.8.2
  Downloading python_dateutil-2.9.0.post0-py2.py3-none-any.whl (229 kB)
Installing collected packages: tzdata, python-dateutil, numpy
  Attempting uninstall: python-dateutil
    Found existing installation: python-dateutil 2.8.1
    Not uninstalling python-dateutil at /usr/lib/python3.9/site-packages, outside environment /mnt1/yarn/usercache/livy/appcache/application_1762986994807_0002/container_1762986994807_0002_01_000001/tmp/spark-7191e70f-e084-4ca3-8fd8-d0b7d681aafc
    Can't uninstall 'python-dateutil'. No files were found to uninstall.
Successfully installed numpy-2.0.2 python-dateutil-2.9.0.post0 tzdata-2025.2


Collecting importlib-resources>=3.2.0
  Downloading importlib_resources-6.5.2-py3-none-any.whl (37 kB)
Collecting fonttools>=4.22.0
  Dow

In [5]:
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import time

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Data

Reading OpenAQ data from AWS S3

OpenAQ project: https://openaq.org/

Registry of AWS Open Data: https://registry.opendata.aws/openaq/

S3 bucket structure: https://docs.openaq.org/aws/about

In [6]:
smog_df = spark \
    .read \
    .format("csv") \
    .option("compression", "gzip") \
    .option("header", True) \
    .load("s3a://openaq-data-archive/records/csv.gz/locationid=100*/year=2022/month=05/location-100*-2022050*.csv.gz")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
smog_df.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1500

In [8]:
smog_df.sample(fraction=0.01).limit(10).show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----------+--------------------------+-------------------------+----------------+----------------+---------+-----+------+
|location_id|sensors_id|location                  |datetime                 |lat             |lon             |parameter|units|value |
+-----------+----------+--------------------------+-------------------------+----------------+----------------+---------+-----+------+
|100        |162       |Badhoevedorp-Sloterweg-100|2022-05-07T12:00:00+02:00|52.334          |4.77401         |co       |µg/m³|-999.0|
|1000       |1803      |Presque Isle Riversi-1000 |2022-05-07T13:00:00-04:00|46.682299       |-68.016195      |pm10     |µg/m³|9.0   |
|1003       |1806      |Padonia-1003              |2022-05-03T01:00:00-04:00|39.462002       |-76.631599      |o3       |ppm  |0.026 |
|1003       |1806      |Padonia-1003              |2022-05-03T13:00:00-04:00|39.462002       |-76.631599      |o3       |ppm  |0.04  |
|1003       |1806      |Padonia-1003              |2022

### Using extended locations data

Data in the S3 bucket contains only a few necessary fields -- parameter readings and a limited information about sensor locations.

Additional information are available through OpenAQ API: https://api.openaq.org/

File openaq_locations.json contains extended information about locations which was downloaded using this REST API.

Data from this file joined with data in S3 can be used for more advanced queries.

In [9]:
df = spark.read.option("multiLine", "true").json("s3a://openaqlocationsadzd/openaq_locations.json")
df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+----------------+-------------+------------+--------+---+--------------------+--------+---------+--------+--------+-------------------+--------------------+--------------------+--------------------+------------+
|              bounds|         coordinates|         country|datetimeFirst|datetimeLast|distance| id|         instruments|isMobile|isMonitor|licenses|locality|               name|               owner|            provider|             sensors|    timezone|
+--------------------+--------------------+----------------+-------------+------------+--------+---+--------------------+--------+---------+--------+--------+-------------------+--------------------+--------------------+--------------------+------------+
|[-0.19968, 5.5838...| {5.58389, -0.19968}|{GH, 152, Ghana}|         NULL|        NULL|    NULL|  3|[{2, Government M...|   false|     true|    NULL|    NULL|         NMA - Nima|{4, Unknown Gover...|{209, Dr. Raphael...|[{6, pm10 µg/m³

In [10]:
locations_df = df.select(
    "id",
    "name",
    "timezone",
    df["country.code"].alias("country_code"),
    df["country.name"].alias("country_name"),
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
locations_df.sample(fraction=0.01).show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+--------------------+-------------------+------------+-------------+
|id  |name                |timezone           |country_code|country_name |
+----+--------------------+-------------------+------------+-------------+
|23  |Amgalan             |Asia/Ulaanbaatar   |MN          |Mongolia     |
|300 |Glen Burnie         |America/New_York   |US          |United States|
|403 |Museo Ferroviario   |America/Santiago   |CL          |Chile        |
|539 |Pinedale            |America/Denver     |US          |United States|
|634 |White Plains        |America/New_York   |US          |United States|
|682 |BDED                |America/New_York   |US          |United States|
|728 |Ronan-MT            |America/Denver     |US          |United States|
|843 |HU-Beltsville       |America/New_York   |US          |United States|
|916 |Wyong               |Australia/Sydney   |AU          |Australia    |
|926 |UAM Xochimilco      |America/Mexico_City|MX          |Mexico       |
|1038|Pilot Point C1032  

In [12]:
locations_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- timezone: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- country_name: string (nullable = true)

## Big data cluster computations

1. Perform calculations for task 2: Create a query that calculates the average concentration of a chosen parameter for each hour of the day (0-1, 1-2, ..., 23-24) for every location, and finds the hour at which this average was highest.
2. Calculate time of execution for 2, 3, 4, 5, 6, 7 worker instances
3. Create execution time, speedup and efficiency plots.

In [13]:
def get_valid_s3_paths(location_ids, spark_session, target_count=30):
    valid_paths = []
    for loc_id in location_ids:
        path = f"s3a://openaq-data-archive/records/csv.gz/locationid={loc_id}/year=2022/month={{05,06}}/*.csv.gz"
        try:
            spark_session.read.format("csv").option("compression", "gzip").option("header", True).load(path)
            valid_paths.append(path)
            #print(f"Found valid path for location ID: {loc_id}")
        except Exception as e:
            if "PATH_NOT_FOUND" in str(e):
                pass
            else:
                print(f"Error loading data for location ID {loc_id}: {e}")

        if len(valid_paths) >= target_count:
            break

    return valid_paths

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
from functools import reduce
from pyspark.sql import DataFrame

def get_df_from_paths(s3_paths, spark_session, locations_df):
    if not s3_paths:
        return None

    correct_dfs = []
    for path in s3_paths:
      dframes = spark_session \
          .read \
          .format("csv") \
          .option("compression", "gzip") \
          .option("header", True) \
          .load(path)
      correct_dfs.append(dframes)

    df_all = reduce(DataFrame.unionByName, correct_dfs)

    df_typed = (
        df_all
        .withColumn("value", col("value").cast("double"))
        .withColumn("location_id", col("location_id").cast("long"))
        .withColumn("datetime", col("datetime").cast("timestamp"))
    )

    df_final = df_typed.join(
        locations_df.select("id", "name"),
        df_typed.location_id == locations_df.id,
        how='left'
    ).drop(locations_df.id)

    return df_final


In [None]:
start = time.time()

pl_locations_df = locations_df.filter(locations_df.country_code == 'PL').select('name', 'id')
pl_location_ids = pl_locations_df.select("id").rdd.flatMap(lambda x: x).collect()
pl_100 = get_valid_s3_paths(pl_location_ids, spark, target_count=100)
df_final = get_df_from_paths(pl_100, spark, pl_locations_df)

PARAMETER = "co"

hour_avg_co = df_final \
  .filter(col("parameter") == PARAMETER) \
  .groupBy(
    col("name"),
    hour(col("datetime")).alias("hour")
  ) \
  .agg(avg(col("value")).alias("avg_value"))

w = Window.partitionBy('name')

hour_avg_co_with_max = hour_avg_co.withColumn('max_avg_value', max('avg_value').over(w))

highest_avg_hour_per_location = hour_avg_co_with_max.filter(col('avg_value') == col('max_avg_value')).drop('max_avg_value')

end = time.time()
print(end - start)

In [16]:
# workers = np.array[1, 2, 3, 4, 5, 6, 7]
# times = np.array([0, 0, 0, 0, 0, 0, 0])
# efficiencies = speedups / workers

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…