In [1]:
# %pip install requests pandas pyspark
# %pip install "pymongo[srv]"

In [0]:
%restart_python

In [0]:
cities = [
    {"city": "Delhi", "state": "Delhi", "latitude": 28.6139, "longitude": 77.2090},
    {"city": "Mumbai", "state": "Maharashtra", "latitude": 19.0760, "longitude": 72.8777},
    {"city": "Bangalore", "state": "Karnataka", "latitude": 12.9716, "longitude": 77.5946},
    {"city": "Hyderabad", "state": "Telangana", "latitude": 17.3850, "longitude": 78.4867},
    {"city": "Chennai", "state": "Tamil Nadu", "latitude": 13.0827, "longitude": 80.2707},
    {"city": "Kolkata", "state": "West Bengal", "latitude": 22.5726, "longitude": 88.3639},
    {"city": "Pune", "state": "Maharashtra", "latitude": 18.5204, "longitude": 73.8567},
    {"city": "Ahmedabad", "state": "Gujarat", "latitude": 23.0225, "longitude": 72.5714},
    {"city": "Jaipur", "state": "Rajasthan", "latitude": 26.9124, "longitude": 75.7873},
    {"city": "Lucknow", "state": "Uttar Pradesh", "latitude": 26.8467, "longitude": 80.9462},
    {"city": "Surat", "state": "Gujarat", "latitude": 21.1702, "longitude": 72.8311},
    {"city": "Kanpur", "state": "Uttar Pradesh", "latitude": 26.4499, "longitude": 80.3319},
    {"city": "Nagpur", "state": "Maharashtra", "latitude": 21.1458, "longitude": 79.0882},
    {"city": "Indore", "state": "Madhya Pradesh", "latitude": 22.7196, "longitude": 75.8577},
    {"city": "Bhopal", "state": "Madhya Pradesh", "latitude": 23.2599, "longitude": 77.4126},
    {"city": "Patna", "state": "Bihar", "latitude": 25.5941, "longitude": 85.1376},
    {"city": "Ludhiana", "state": "Punjab", "latitude": 30.9005, "longitude": 75.8573},
    {"city": "Coimbatore", "state": "Tamil Nadu", "latitude": 11.0168, "longitude": 76.9558},
    {"city": "Kochi", "state": "Kerala", "latitude": 9.9312, "longitude": 76.2673},
    {"city": "Visakhapatnam", "state": "Andhra Pradesh", "latitude": 17.6868, "longitude": 83.2185},
    {"city": "Vijayawada", "state": "Andhra Pradesh", "latitude": 16.5062, "longitude": 80.6480},
    {"city": "Chandigarh", "state": "Chandigarh", "latitude": 30.7333, "longitude": 76.7794},
    {"city": "Agra", "state": "Uttar Pradesh", "latitude": 27.1767, "longitude": 78.0081},
    {"city": "Varanasi", "state": "Uttar Pradesh", "latitude": 25.3176, "longitude": 82.9739},
    {"city": "Srinagar", "state": "Jammu & Kashmir", "latitude": 34.0837, "longitude": 74.7973}
]

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Define schema that matches your records
schema = StructType([
    StructField("city", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("timestamp", StringType(), True),  # or TimestampType() if converting later
    StructField("temperature_2m", DoubleType(), True),
    StructField("wind_speed_10m", DoubleType(), True),
    StructField("cloud_cover", DoubleType(), True),
    StructField("precipitation", DoubleType(), True),
    StructField("relative_humidity_2m", DoubleType(), True),
    StructField("dew_point_2m", DoubleType(), True),
])

# Create an empty DataFrame with this schema
weather_df = spark.createDataFrame([], schema)

In [0]:
import requests
import json
from time import sleep

def get_weather_data(lat, lon):
    url = f"https://archive-api.open-meteo.com/v1/archive?latitude={lat}&longitude={lon}&start_date=2022-08-06&end_date=2025-03-31&hourly=temperature_2m,relative_humidity_2m,dew_point_2m,wind_speed_10m,cloud_cover,precipitation"
    response = requests.get(url)
    if response.status_code == 200:
        return response.json()
    else:
        print(response.status_code)
        print(f"Error fetching weather data for {lat}, {lon}")
        return None

for city in cities:
    print(f"Fetching weather data for {city['city']}")
    data = get_weather_data(city["latitude"], city["longitude"])
    if data:
        records = []
        for i, timestamp in enumerate(data["hourly"]["time"]):
            records.append({
                "city": city["city"],
                "latitude": float(city["latitude"]),
                "longitude": float(city["longitude"]),
                "timestamp": timestamp,
                "temperature_2m": float(data["hourly"]["temperature_2m"][i]),
                "wind_speed_10m": float(data["hourly"]["wind_speed_10m"][i]),
                "cloud_cover": float(data["hourly"]["cloud_cover"][i]),
                "precipitation": float(data["hourly"]["precipitation"][i]),
                "relative_humidity_2m": float(data["hourly"]["relative_humidity_2m"][i]),
                "dew_point_2m": float(data["hourly"]["dew_point_2m"][i])
            })

        city_df = spark.createDataFrame(records, schema=schema)
        weather_df = weather_df.union(city_df)
    sleep(5)

Fetching weather data for Delhi
Fetching weather data for Mumbai
Fetching weather data for Bangalore
Fetching weather data for Hyderabad
Fetching weather data for Chennai
Fetching weather data for Kolkata
Fetching weather data for Pune
Fetching weather data for Ahmedabad
Fetching weather data for Jaipur
Fetching weather data for Lucknow
Fetching weather data for Surat
Fetching weather data for Kanpur
Fetching weather data for Nagpur
Fetching weather data for Indore
Fetching weather data for Bhopal
Fetching weather data for Patna
Fetching weather data for Ludhiana
Fetching weather data for Coimbatore
Fetching weather data for Kochi
Fetching weather data for Visakhapatnam
Fetching weather data for Vijayawada
Fetching weather data for Chandigarh
Fetching weather data for Agra
Fetching weather data for Varanasi
Fetching weather data for Srinagar


In [0]:
weather_df.show(5)

+-----+--------+---------+----------------+--------------+--------------+-----------+-------------+--------------------+------------+
| city|latitude|longitude|       timestamp|temperature_2m|wind_speed_10m|cloud_cover|precipitation|relative_humidity_2m|dew_point_2m|
+-----+--------+---------+----------------+--------------+--------------+-----------+-------------+--------------------+------------+
|Delhi| 28.6139|   77.209|2022-08-06T00:00|          26.0|           5.2|       75.0|          0.0|                97.0|        25.4|
|Delhi| 28.6139|   77.209|2022-08-06T01:00|          25.6|           7.6|      100.0|          0.7|                93.0|        24.4|
|Delhi| 28.6139|   77.209|2022-08-06T02:00|          25.5|           4.4|      100.0|          1.2|                93.0|        24.3|
|Delhi| 28.6139|   77.209|2022-08-06T03:00|          26.7|           4.2|       51.0|          0.0|                89.0|        24.8|
|Delhi| 28.6139|   77.209|2022-08-06T04:00|          28.5|    

In [0]:
weather_df.describe().show()

+-------+-------------+------------------+------------------+----------------+------------------+-----------------+-----------------+-------------------+--------------------+------------------+
|summary|         city|          latitude|         longitude|       timestamp|    temperature_2m|   wind_speed_10m|      cloud_cover|      precipitation|relative_humidity_2m|      dew_point_2m|
+-------+-------------+------------------+------------------+----------------+------------------+-----------------+-----------------+-------------------+--------------------+------------------+
|  count|       581400|            581400|            581400|          581400|            581400|           581400|           581400|             581400|              581400|            581400|
|   mean|         NULL|22.107843999999595|  78.1651559999993|            NULL|24.748929652562765|9.405087031303744|45.54083935328517|0.14326264189886515|   66.58499312005505|16.828733230134155|
| stddev|         NULL|6.28032

In [0]:
# Define schema that matches your records
aqi_schema = StructType([
    StructField("city", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("timestamp", StringType(), True),  # or TimestampType() if converting later
    StructField("pm2_5", DoubleType(), True),
    StructField("pm_10", DoubleType(), True),
    StructField("aqi", DoubleType(), True),
])

# Create an empty DataFrame with this schema
aqi_df = spark.createDataFrame([], aqi_schema)

In [0]:
def get_aqi_data(lat, lon):
    url = f"https://air-quality-api.open-meteo.com/v1/air-quality?latitude={lat}&longitude={lon}&hourly=pm10,pm2_5,us_aqi&start_date=2022-08-06&end_date=2025-03-31"
    response = requests.get(url)
    if response.status_code == 200:
        return response.json()
    else:
        print(response.status_code)
        print(f"Error fetching AQI data for {lat}, {lon}")
        return None

for city in cities:
    print(f"Fetching AQI data for {city['city']}")
    data = get_aqi_data(city["latitude"], city["longitude"])
    if data:
        records = []
        for i, timestamp in enumerate(data["hourly"]["time"]):
            records.append({
                "city": city["city"],
                "latitude": float(city["latitude"]),
                "longitude": float(city["longitude"]),
                "timestamp": timestamp,
                "pm2_5": float(data["hourly"]["pm2_5"][i]),
                "pm_10": float(data["hourly"]["pm10"][i]),
                "aqi": float(data["hourly"]["us_aqi"][i]),
            })

        city_df = spark.createDataFrame(records, schema=aqi_schema)
        aqi_df = aqi_df.union(city_df)
    sleep(5)

Fetching AQI data for Delhi
Fetching AQI data for Mumbai
Fetching AQI data for Bangalore
Fetching AQI data for Hyderabad
Fetching AQI data for Chennai
Fetching AQI data for Kolkata
Fetching AQI data for Pune
Fetching AQI data for Ahmedabad
Fetching AQI data for Jaipur
Fetching AQI data for Lucknow
Fetching AQI data for Surat
Fetching AQI data for Kanpur
Fetching AQI data for Nagpur
Fetching AQI data for Indore
Fetching AQI data for Bhopal
Fetching AQI data for Patna
Fetching AQI data for Ludhiana
Fetching AQI data for Coimbatore
Fetching AQI data for Kochi
Fetching AQI data for Visakhapatnam
Fetching AQI data for Vijayawada
Fetching AQI data for Chandigarh
Fetching AQI data for Agra
Fetching AQI data for Varanasi
Fetching AQI data for Srinagar


In [0]:
aqi_df.show(5)

+-----+--------+---------+----------------+-----+-----+-----+
| city|latitude|longitude|       timestamp|pm2_5|pm_10|  aqi|
+-----+--------+---------+----------------+-----+-----+-----+
|Delhi| 28.6139|   77.209|2022-08-06T00:00|112.2|161.1|137.0|
|Delhi| 28.6139|   77.209|2022-08-06T01:00|101.0|145.3|144.0|
|Delhi| 28.6139|   77.209|2022-08-06T02:00| 83.8|120.6|150.0|
|Delhi| 28.6139|   77.209|2022-08-06T03:00| 63.1| 92.0|151.0|
|Delhi| 28.6139|   77.209|2022-08-06T04:00| 54.4| 80.6|152.0|
+-----+--------+---------+----------------+-----+-----+-----+
only showing top 5 rows



In [0]:
aqi_df.describe().show()

+-------+-------------+------------------+------------------+----------------+-----------------+-----------------+------------------+
|summary|         city|          latitude|         longitude|       timestamp|            pm2_5|            pm_10|               aqi|
+-------+-------------+------------------+------------------+----------------+-----------------+-----------------+------------------+
|  count|       581400|            581400|            581400|          581400|           581400|           581400|            581400|
|   mean|         NULL|22.107843999999595|  78.1651559999993|            NULL|41.98181355349154|66.35223374612998|107.67951668386652|
| stddev|         NULL|6.2803205784175535|3.8366519350712758|            NULL|33.89331397030159|60.43830764001279| 51.73688208575391|
|    min|         Agra|            9.9312|           72.5714|2022-08-06T00:00|              0.2|              0.2|              11.0|
|    max|Visakhapatnam|           34.0837|           88.3639|2

In [0]:
full_data = weather_df.join(aqi_df, on=["city", "latitude", "longitude", "timestamp"], how="inner")

In [0]:
full_data.show(5)

+----+--------+---------+----------------+--------------+--------------+-----------+-------------+--------------------+------------+-----+-----+-----+
|city|latitude|longitude|       timestamp|temperature_2m|wind_speed_10m|cloud_cover|precipitation|relative_humidity_2m|dew_point_2m|pm2_5|pm_10|  aqi|
+----+--------+---------+----------------+--------------+--------------+-----------+-------------+--------------------+------------+-----+-----+-----+
|Agra| 27.1767|  78.0081|2022-08-06T04:00|          30.6|           4.1|       98.0|          0.1|                77.0|        26.2| 37.4| 54.1|146.0|
|Agra| 27.1767|  78.0081|2022-08-06T07:00|          32.5|           2.9|      100.0|          0.0|                68.0|        25.8| 29.9| 43.4|147.0|
|Agra| 27.1767|  78.0081|2022-08-06T08:00|          32.3|           3.3|      100.0|          0.2|                68.0|        25.7| 30.0| 43.5|147.0|
|Agra| 27.1767|  78.0081|2022-08-06T09:00|          31.5|           3.7|      100.0|          

In [0]:
full_data.describe().show()

+-------+-------------+------------------+------------------+----------------+------------------+-----------------+-----------------+------------------+--------------------+------------------+------------------+-----------------+------------------+
|summary|         city|          latitude|         longitude|       timestamp|    temperature_2m|   wind_speed_10m|      cloud_cover|     precipitation|relative_humidity_2m|      dew_point_2m|             pm2_5|            pm_10|               aqi|
+-------+-------------+------------------+------------------+----------------+------------------+-----------------+-----------------+------------------+--------------------+------------------+------------------+-----------------+------------------+
|  count|       581400|            581400|            581400|          581400|            581400|           581400|           581400|            581400|              581400|            581400|            581400|           581400|            581400|
|   

In [0]:
display(full_data)

In [0]:
mongo_uri = "############"

targetDb = "weather-aqi-db"
targetCollection = "weather-aqi-col"

In [0]:
full_data.limit(600000).write.format("mongodb") \
    .mode("append") \
    .option("spark.mongodb.write.connection.uri", mongo_uri) \
    .option("database", targetDb) \
    .option("collection", targetCollection) \
    .option("maxBatchSize", 1000) \
    .save()