In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from datetime import datetime, timedelta
from typing import List, Dict, Tuple
import requests



In [27]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("AQAnalysis") \
    .getOrCreate()

SyntaxError: unexpected character after line continuation character (326554945.py, line 3)

In [3]:
headers = {
    "X-API-Key": "9b7c23f6701f7f8e923a5691c6b67d1361bd044b308a8f863502d1190cbe7435"
    
}

In [4]:
def get_sensor_data_for_city(city_data: tuple) -> List[Tuple[str, Dict]]:
    """
    Helper function to process a single city. This runs on executor nodes.
    Returns list of (city, sensor_dict) tuples.
    """
    city, (lat, lon) = city_data
    base_url = "https://api.openaq.org/v3/locations"
    params = {
        'coordinates': f"{lat},{lon}",
        'radius': 15000,
        'limit': 15
    }
    
    try:
        response = requests.get(base_url, params=params, headers=headers)
        response.raise_for_status()
        data = response.json()
        sensors = []
        for location in data.get('results', []):
            for sensor in location.get('sensors', []):
                sensors.append({
                    'sensor_id': sensor['id'],
                    'parameter': sensor['parameter']['name'],
                    'units': sensor['parameter']['units'],
                    'location_name': location['name']
                })
        # Return list of (city, sensor) tuples
        return [(city, sensor) for sensor in sensors]
    except requests.exceptions.RequestException as e:
        print(f"Error fetching data for {city}: {e}")
        return []
    
    

In [5]:
def get_sensor_ids(coordinates_dict: Dict):

    # Convert coordinates dictionary to list of tuples for parallelization
    cities_data = list(coordinates_dict.items())
    
    # Create RDD from cities data and collect sensor information
    cities_rdd = spark.sparkContext.parallelize(cities_data)
    sensor_data_rdd = cities_rdd.flatMap(get_sensor_data_for_city)
    
    return sensor_data_rdd

In [6]:
def fetch_measurements_for_sensors(sensor_ids_rdd: 'RDD[str]', date_to: str, date_from: str) -> 'RDD[Tuple[str, Dict]]':
    """
    Returns RDD of (sensor_id, measurement_dict) tuples.
    """
    def fetch_sensor_measurements(sensor_id: str) -> List[Tuple[str, Dict]]:
        url = f"https://api.openaq.org/v3/sensors/{sensor_id}/measurements/daily"
        params = {'datetime_to': date_to, 'datetime_from': date_from}
        try:
            response = requests.get(url, params=params, headers=headers)
            response.raise_for_status()
            measurements = response.json()
            return [(sensor_id, measurement) for measurement in measurements.get('results', [])]
        except Exception as e:
            print(f"Error fetching {sensor_id}: {e}")
            return []
    
    return sensor_ids_rdd.flatMap(fetch_sensor_measurements)

In [7]:


def analyze_city_data(cities, date_to, date_from):
    """
    Main function to analyze air quality data for all cities.
    """
    # Get sensor IDs for all cities
    print("Fetching sensor information...")
    sensor_data_rdd = get_sensor_ids(cities)  # RDD[(city, sensor_dict)]
    # print('Sensor Data RDD :',sensor_data_rdd.show(5))

    # Create (sensor_id, city) pairs
    sensor_city_rdd = sensor_data_rdd.flatMap(
        lambda x: [(x[1]['sensor_id'], x[0])]  # (sensor_id, city)
    )
    # print('Sensor City RDD :',sensor_city_rdd.show(5))


    sensor_ids_rdd = sensor_city_rdd.keys()  # RDD[sensor_id]
    # print('Sensor IDs RDD :',sensor_ids_rdd.show())


    # Use correct function with date parameters
    measurements_rdd = fetch_measurements_for_sensors(sensor_ids_rdd, date_to, date_from)
    # print('Measurements RDD :',measurements_rdd.show())


    joined_rdd = sensor_city_rdd.join(measurements_rdd)  # RDD[(sensor_id, (city, measurement_dict))]
    
    # Convert to Row objects
    def to_row(sensor_data: Tuple[str, Tuple[str, Dict]]) -> Row:
        sensor_id, (city, measurement) = sensor_data
        return Row(
            city=city,
            location=measurement.get('location', 'Unknown'),
            parameter=measurement.get('parameter', {}).get('name', 'Unknown'),
            units=measurement.get('parameter', {}).get('units', 'Unknown'),
            date=measurement.get('period', {}).get('datetimeTo', {}).get('utc', ''),
            value=measurement.get('value'),
            sensor_id=sensor_id
        )
    
    final_rdd = joined_rdd.map(to_row)
    
    return spark.createDataFrame(final_rdd)

In [8]:
la_df = analyze_city_data(
    {"Los Angeles": (34.0522, -118.2437)},
    date_to='2020-12-31',
    date_from='2020-01-01'
)
la_df.show()
la_df.select('sensor_id').distinct().collect()


Fetching sensor information...


                                                                                

+-----------+--------+---------+-----+--------------------+-----+---------+
|       city|location|parameter|units|                date|value|sensor_id|
+-----------+--------+---------+-----+--------------------+-----+---------+
|Los Angeles| Unknown|       co|  ppm|2020-01-02T08:00:00Z|0.453|    25472|
|Los Angeles| Unknown|       co|  ppm|2020-01-03T08:00:00Z|0.507|    25472|
|Los Angeles| Unknown|       co|  ppm|2020-01-04T08:00:00Z|0.548|    25472|
|Los Angeles| Unknown|       co|  ppm|2020-01-05T08:00:00Z|0.617|    25472|
|Los Angeles| Unknown|       co|  ppm|2020-01-06T08:00:00Z|0.476|    25472|
|Los Angeles| Unknown|       co|  ppm|2020-01-07T08:00:00Z|0.503|    25472|
|Los Angeles| Unknown|       co|  ppm|2020-01-08T08:00:00Z|0.391|    25472|
|Los Angeles| Unknown|       co|  ppm|2020-01-09T08:00:00Z|0.496|    25472|
|Los Angeles| Unknown|       co|  ppm|2020-01-10T08:00:00Z|0.375|    25472|
|Los Angeles| Unknown|       co|  ppm|2020-01-11T08:00:00Z|0.492|    25472|
|Los Angeles

                                                                                

[Row(sensor_id=25472),
 Row(sensor_id=25473),
 Row(sensor_id=25474),
 Row(sensor_id=25192),
 Row(sensor_id=25193),
 Row(sensor_id=25194),
 Row(sensor_id=23019),
 Row(sensor_id=25195),
 Row(sensor_id=25196)]

In [10]:
ny_df = analyze_city_data(
    {"New York": (40.7128, -74.0060)},
    date_to='2020-12-31',
    
    date_from='2020-01-01'
)
ny_df.show()


Fetching sensor information...


                                                                                

+--------+--------+---------+-----+--------------------+------+---------+
|    city|location|parameter|units|                date| value|sensor_id|
+--------+--------+---------+-----+--------------------+------+---------+
|New York| Unknown|       co|  ppm|2020-01-02T05:00:00Z| 0.205|     2016|
|New York| Unknown|       co|  ppm|2020-01-03T05:00:00Z|   0.7|     2016|
|New York| Unknown|       co|  ppm|2020-01-04T05:00:00Z| 0.668|     2016|
|New York| Unknown|       co|  ppm|2020-01-05T05:00:00Z| 0.483|     2016|
|New York| Unknown|       co|  ppm|2020-01-06T05:00:00Z| 0.114|     2016|
|New York| Unknown|       co|  ppm|2020-01-07T05:00:00Z| 0.457|     2016|
|New York| Unknown|       co|  ppm|2020-01-08T05:00:00Z| 0.417|     2016|
|New York| Unknown|       co|  ppm|2020-01-09T05:00:00Z|   0.1|     2016|
|New York| Unknown|       co|  ppm|2020-01-10T05:00:00Z| 0.386|     2016|
|New York| Unknown|       co|  ppm|2020-01-11T05:00:00Z|  0.69|     2016|
|New York| Unknown|       co|  ppm|202

In [11]:
ny_df.select('sensor_id').distinct().collect()


                                                                                

[Row(sensor_id=2016),
 Row(sensor_id=673),
 Row(sensor_id=2018),
 Row(sensor_id=2644),
 Row(sensor_id=2645),
 Row(sensor_id=2646),
 Row(sensor_id=1143),
 Row(sensor_id=1128),
 Row(sensor_id=1145),
 Row(sensor_id=1098),
 Row(sensor_id=1099),
 Row(sensor_id=23341),
 Row(sensor_id=1103),
 Row(sensor_id=671)]

In [12]:
ny_df.select('location').distinct().collect()


                                                                                

[Row(location='Unknown')]

In [13]:
#write data in to table


spark1 = SparkSession.builder \
    .appName("Write to PostgreSQL") \
    .config("spark.jars", "./Spark Utils/postgresql-42.7.4.jar") \
    .getOrCreate()


25/02/13 19:05:57 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [30]:
properties = {
    "user": "postgres",
    "password": "admin",
    "driver": "org.postgresql.Driver"
}


In [31]:
url = "jdbc:postgresql://localhost:5432/openaq"

ny_df.write \
    .jdbc(url=url, table="dummy_city", mode="append", properties=properties)



Py4JJavaError: An error occurred while calling o278.jdbc.
: java.lang.ClassNotFoundException: org.postgresql.Driver
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:587)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:103)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:254)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:258)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:47)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:364)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:251)
	at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:766)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)


25/02/13 20:39:43 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 125347 ms exceeds timeout 120000 ms
25/02/13 20:39:43 WARN SparkContext: Killing executors is not supported by current scheduler.
25/02/13 20:39:43 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

In [23]:
import logging
logging.getLogger("py4j").setLevel(logging.DEBUG)
