In [1]:
import os
import sys

os.environ["JAVA_HOME"] = "../data-engineering/.JDK 8"
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import requests, json

# Create SparkSession
spark = SparkSession \
    .builder \
        .appName("GreenDataAnalyticsProject") \
            .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/25 20:48:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/25 20:48:53 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
api_url = "https://maps2.bristol.gov.uk/server2/rest/services/ext/air_quality/MapServer/0/query?outFields=*&where=1%3D1&f=geojson"

try:
    response = requests.get(api_url)
    geojson_data = response.json()
except requests.exceptions.HTTPError as http_err:
    # This block handles HTTP errors (e.g., 404, 500)
    print(f"HTTP error occurred: {http_err}")

except requests.exceptions.ConnectionError as conn_err:
    # This block handles issues like network errors, no connection
    print(f"Connection error occurred: {conn_err}")

except requests.exceptions.Timeout as timeout_err:
    # This block handles timeouts
    print(f"Timeout error occurred: {timeout_err}")

except requests.exceptions.RequestException as req_err:
    # This block handles any other request-related errors
    print(f"An error occurred with the request: {req_err}")

except ValueError as json_err:
    # This block handles errors related to JSON parsing
    print(f"Failed to parse JSON: {json_err}")

except Exception as e:
    # This block handles any unexpected errors
    print(f"An unexpected error occurred: {e}")

In [4]:
features = geojson_data.get("features", [])
json_features = [json.dumps(feature) for feature in features]
rdd = spark.sparkContext.parallelize(json_features)
df = spark.read.json(rdd)


25/03/25 20:48:57 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [5]:
df = df.select('id',
    col('properties.location').alias('location'),
    col('properties.LocationClass').alias('Location Type'),
    df.geometry.coordinates.alias("coordinates"))


In [6]:
df.show(1, truncate=False)

+---+----------------+-------------+-----------------------------------------+
|id |location        |Location Type|coordinates                              |
+---+----------------+-------------+-----------------------------------------+
|1  |Withywood School|Urban Traffic|[-2.6277488032062446, 51.407745517999835]|
+---+----------------+-------------+-----------------------------------------+
only showing top 1 row



In [7]:
df = df.withColumn("coordinates", col("coordinates")[1])\
    .withColumnRenamed("coordinates", "Latitude")

In [8]:
from pyspark.sql import functions as F
max_value = df.agg(F.max('Latitude')).collect()[0][0]
min_value = df.agg(F.min('Latitude')).collect()[0][0]

def normalise(original_value):
    scaled_value = (original_value - min_value) / (max_value - min_value)
    return scaled_value
    

In [9]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
normalise_udf = udf(normalise, DoubleType())


In [10]:
df = df.withColumn("Latitude (Normalised)", normalise_udf("Latitude")).drop("Latitude")
df.show(truncate=False)

+---+-----------------+----------------+---------------------+
|id |location         |Location Type   |Latitude (Normalised)|
+---+-----------------+----------------+---------------------+
|1  |Withywood School |Urban Traffic   |0.0909330382676792   |
|2  |Colston Avenue   |Urban Traffic   |0.2938159127984107   |
|3  |Blackboy Hill    |Urban Traffic   |0.357305528540315    |
|4  |Three Lamps      |Urban Traffic   |0.2489726016785307   |
|5  |Bedminster Parade|Urban Traffic   |0.2429141946354523   |
|6  |Church Road      |Urban Traffic   |0.3102920364172479   |
|7  |St. Andrew's Rd  |Urban Traffic   |0.49558147749590986  |
|8  |Higham Street    |Urban Background|0.25101726499215116  |
|9  |B.R.I.           |Urban Traffic   |0.312865051710554    |
|10 |Bath Road        |Urban Traffic   |0.23296471427037527  |
|11 |Whitefriars      |Urban Traffic   |0.3067735040613447   |
|12 |Galleries        |Urban Traffic   |0.3017723316726845   |
|13 |Ferndown Close   |Urban Background|0.4669340366144

In [11]:
import pandas as pd
df.toPandas().to_csv("cleanedAirQualityData.csv")