In [0]:
spark

In [0]:
import requests, json
#requests: A powerful third-party Python library used to make HTTP requests (e.g., GET, POST)
#json: Python’s built-in module to parse and work with JSON (a common format used by APIs)
#Together, they allow you to fetch data from the web and convert it to Python objects like dictionaries or lists.

API_KEY = "579b464db66ec23bdd00000181e5dd27accd449c77d0182c77980d9e"
URL = "https://api.data.gov.in/resource/3b01bcb8-0b14-4abf-b6f2-c1bfd384ba69"

params = {
    "api-key" : API_KEY,
    "format" : "json",
    "limit" : 100,
    "offset" : 0
}

In [0]:
# if response.status_code == 200:                #success - status code 200
#     data = response.json()                     # storing json data as dictionary

#     records = data.get("records", [])          # storing data in list format

#     if records:
#         df = spark.createDataFrame(records)
#         df.write.mode("append").format("delta").save("/FileStore/tables/AQI_raw_delta")  # append for saving all the data / overwrite
#     else:
#         print("No record found in API response")
# else:
#     print("Error", response.status_code)

#alternate(more risky when exceptions occur) for below written code

In [0]:

try:
    response = requests.get(URL, params= params)
    response.raise_for_status()  # raised an error for for 4xx/5xx responses

    data = response.json()
    records = data.get("records", [])

    if records:
        try:
            df = spark.createDataFrame(records)
            df.write.mode("overwrite").format("delta").save("/FileStore/tables/AQI_raw_delta")
            print(f"Successfully added {df.count()} records in Delta Lake")
        except Exception as e:
            print(f"Error writing to Delta Lake: {e}")
    else:
        print("No record found in API")

except requests.exceptions.HTTPError as e:
    print(f"HTTP error: {e}")  #handling 4xx and 5xx errors

except requests.exceptions.ConnectionError as e:
     print("Failed to connect to API. Check internet or API URL.")  #Handles network issues

except Exception as e:
    print(f"Unexpected error: {e}")  #handles unknown errors

Successfully added 100 records in Delta Lake


In [0]:
dataF = spark.read.format("delta").load("dbfs:/FileStore/tables/AQI_raw_delta")

In [0]:
display(dataF)

avg_value,city,country,last_update,latitude,longitude,max_value,min_value,pollutant_id,state,station
64.0,Naharlagun,India,10-05-2025 16:00:00,27.103358,93.679645,134.0,32.0,PM2.5,Arunachal_Pradesh,"Naharlagun, Naharlagun - APSPCB"
11.0,Byrnihat,India,10-05-2025 16:00:00,26.071318,91.87488,21.0,5.0,NO2,Assam,"Central Academy for SFS, Byrnihat - PCBA"
40.0,Anantapur,India,10-05-2025 16:00:00,14.675886,77.593027,121.0,23.0,NO2,Andhra_Pradesh,"Gulzarpet, Anantapur - APPCB"
11.0,Anantapur,India,10-05-2025 16:00:00,14.675886,77.593027,43.0,5.0,NH3,Andhra_Pradesh,"Gulzarpet, Anantapur - APPCB"
10.0,Rajamahendravaram,India,10-05-2025 16:00:00,16.9872867,81.7363176,11.0,9.0,SO2,Andhra_Pradesh,"Anand Kala Kshetram, Rajamahendravaram - APPCB"
1.0,Tirumala,India,10-05-2025 16:00:00,13.67,79.35,2.0,1.0,NH3,Andhra_Pradesh,"Toll Gate, Tirumala - APPCB (Formerly known as Tirumala, Tirupati - APPCB)"
75.0,Amaravati,India,10-05-2025 16:00:00,16.5150833,80.5181667,88.0,60.0,PM10,Andhra_Pradesh,"Secretariat, Amaravati - APPCB"
54.0,Amaravati,India,10-05-2025 16:00:00,16.5150833,80.5181667,73.0,17.0,OZONE,Andhra_Pradesh,"Secretariat, Amaravati - APPCB"
74.0,Anantapur,India,10-05-2025 16:00:00,14.675886,77.593027,92.0,59.0,PM2.5,Andhra_Pradesh,"Gulzarpet, Anantapur - APPCB"
61.0,Delhi,India,10-05-2025 16:00:00,28.498571,77.26484,88.0,26.0,CO,Delhi,"Dr. Karni Singh Shooting Range, Delhi - DPCC"


In [0]:
dataF.printSchema()

root
 |-- avg_value: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- last_update: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- max_value: string (nullable = true)
 |-- min_value: string (nullable = true)
 |-- pollutant_id: string (nullable = true)
 |-- state: string (nullable = true)
 |-- station: string (nullable = true)



In [0]:
from pyspark.sql.functions import to_timestamp, col

df_fixedSchema = dataF.withColumn("last_update", to_timestamp(col("last_update"), "dd-MM-yyyy HH:mm:ss")) \
                        .withColumn("avg_value", col("avg_value").cast("double")) \
                        .withColumn("max_value", col("max_value").cast("double")) \
                        .withColumn("min_value", col("min_value").cast("double")) \
                        .withColumn("latitude", col("latitude").cast("double")) \
                        .withColumn("longitude", col("longitude").cast("double"))  

In [0]:
display(df_fixedSchema)
df_fixedSchema.printSchema()

avg_value,city,country,last_update,latitude,longitude,max_value,min_value,pollutant_id,state,station
64.0,Naharlagun,India,2025-05-10T16:00:00.000+0000,27.103358,93.679645,134.0,32.0,PM2.5,Arunachal_Pradesh,"Naharlagun, Naharlagun - APSPCB"
11.0,Byrnihat,India,2025-05-10T16:00:00.000+0000,26.071318,91.87488,21.0,5.0,NO2,Assam,"Central Academy for SFS, Byrnihat - PCBA"
40.0,Anantapur,India,2025-05-10T16:00:00.000+0000,14.675886,77.593027,121.0,23.0,NO2,Andhra_Pradesh,"Gulzarpet, Anantapur - APPCB"
11.0,Anantapur,India,2025-05-10T16:00:00.000+0000,14.675886,77.593027,43.0,5.0,NH3,Andhra_Pradesh,"Gulzarpet, Anantapur - APPCB"
10.0,Rajamahendravaram,India,2025-05-10T16:00:00.000+0000,16.9872867,81.7363176,11.0,9.0,SO2,Andhra_Pradesh,"Anand Kala Kshetram, Rajamahendravaram - APPCB"
1.0,Tirumala,India,2025-05-10T16:00:00.000+0000,13.67,79.35,2.0,1.0,NH3,Andhra_Pradesh,"Toll Gate, Tirumala - APPCB (Formerly known as Tirumala, Tirupati - APPCB)"
75.0,Amaravati,India,2025-05-10T16:00:00.000+0000,16.5150833,80.5181667,88.0,60.0,PM10,Andhra_Pradesh,"Secretariat, Amaravati - APPCB"
54.0,Amaravati,India,2025-05-10T16:00:00.000+0000,16.5150833,80.5181667,73.0,17.0,OZONE,Andhra_Pradesh,"Secretariat, Amaravati - APPCB"
74.0,Anantapur,India,2025-05-10T16:00:00.000+0000,14.675886,77.593027,92.0,59.0,PM2.5,Andhra_Pradesh,"Gulzarpet, Anantapur - APPCB"
61.0,Delhi,India,2025-05-10T16:00:00.000+0000,28.498571,77.26484,88.0,26.0,CO,Delhi,"Dr. Karni Singh Shooting Range, Delhi - DPCC"


root
 |-- avg_value: double (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- last_update: timestamp (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- max_value: double (nullable = true)
 |-- min_value: double (nullable = true)
 |-- pollutant_id: string (nullable = true)
 |-- state: string (nullable = true)
 |-- station: string (nullable = true)



In [0]:
from pyspark.sql.functions import col

# for i in df_fixedSchema.columns:
#     nullCount = df_fixedSchema.filter(col(i).isNull()).count()
#     print(f"Column {i} has {nullCount} null values.")

# not optimized, If you have 100 columns, it runs 100 separate filter queries, making it slow for large datasets.

#alternative optimized for checking nulls
df_fixedSchema.selectExpr([f"sum(case when {col} is NULL then 1 else 0 end) as {col}_nulls" for col in df_fixedSchema.columns]).show()


+---------------+----------+-------------+-----------------+--------------+---------------+---------------+---------------+------------------+-----------+-------------+
|avg_value_nulls|city_nulls|country_nulls|last_update_nulls|latitude_nulls|longitude_nulls|max_value_nulls|min_value_nulls|pollutant_id_nulls|state_nulls|station_nulls|
+---------------+----------+-------------+-----------------+--------------+---------------+---------------+---------------+------------------+-----------+-------------+
|              7|         0|            0|                0|             0|              0|              7|              7|                 0|          0|            0|
+---------------+----------+-------------+-----------------+--------------+---------------+---------------+---------------+------------------+-----------+-------------+



In [0]:
df_fixedNulls = df_fixedSchema.dropna()

In [0]:
for i in df_fixedNulls.columns:
    nullCount = df_fixedNulls.filter(col(i).isNull()).count()
    print(f"Column {i} has {nullCount} null values.")

Column avg_value has 0 null values.
Column city has 0 null values.
Column country has 0 null values.
Column last_update has 0 null values.
Column latitude has 0 null values.
Column longitude has 0 null values.
Column max_value has 0 null values.
Column min_value has 0 null values.
Column pollutant_id has 0 null values.
Column state has 0 null values.
Column station has 0 null values.


In [0]:
df_fixedNulls.count()

Out[13]: 93

In [0]:
df_fixedNulls.selectExpr("COUNT(CASE WHEN `avg_value` IS NULL THEN 1 END) AS avg_value_nulls").show()  #count nulls in every column 

+---------------+
|avg_value_nulls|
+---------------+
|              0|
+---------------+



In [0]:
# it checks for number of duplicates in that dataframe
duplicate_count = df_fixedNulls.count() - df_fixedNulls.distinct().count()
print(f"Total duplicate rows: {duplicate_count}")

Total duplicate rows: 0


In [0]:
df_fixedDuplicates = df_fixedNulls.dropDuplicates()

In [0]:
df_fixedDuplicates.count()

if df_fixedNulls.count() == df_fixedDuplicates.count():
    print("No Duplicates found")
else:       
    print(f"Duplicates {df_fixedNulls.count() - df_fixedDuplicates.count()} found and dropped")

No Duplicates found


In [0]:
display(df_fixedDuplicates)

avg_value,city,country,last_update,latitude,longitude,max_value,min_value,pollutant_id,state,station
75.0,Amaravati,India,2025-05-10T16:00:00.000+0000,16.5150833,80.5181667,88.0,60.0,PM10,Andhra_Pradesh,"Secretariat, Amaravati - APPCB"
1.0,Tirumala,India,2025-05-10T16:00:00.000+0000,13.67,79.35,2.0,1.0,NH3,Andhra_Pradesh,"Toll Gate, Tirumala - APPCB (Formerly known as Tirumala, Tirupati - APPCB)"
11.0,Anantapur,India,2025-05-10T16:00:00.000+0000,14.675886,77.593027,43.0,5.0,NH3,Andhra_Pradesh,"Gulzarpet, Anantapur - APPCB"
101.0,Delhi,India,2025-05-10T16:00:00.000+0000,28.5710274,77.0719006,153.0,10.0,OZONE,Delhi,"Dwarka-Sector 8, Delhi - DPCC"
64.0,Naharlagun,India,2025-05-10T16:00:00.000+0000,27.103358,93.679645,134.0,32.0,PM2.5,Arunachal_Pradesh,"Naharlagun, Naharlagun - APSPCB"
40.0,Anantapur,India,2025-05-10T16:00:00.000+0000,14.675886,77.593027,121.0,23.0,NO2,Andhra_Pradesh,"Gulzarpet, Anantapur - APPCB"
146.0,Delhi,India,2025-05-10T16:00:00.000+0000,28.5627763,77.1180053,319.0,7.0,PM2.5,Delhi,"IGI Airport (T3), Delhi - IMD"
10.0,Rajamahendravaram,India,2025-05-10T16:00:00.000+0000,16.9872867,81.7363176,11.0,9.0,SO2,Andhra_Pradesh,"Anand Kala Kshetram, Rajamahendravaram - APPCB"
54.0,Amaravati,India,2025-05-10T16:00:00.000+0000,16.5150833,80.5181667,73.0,17.0,OZONE,Andhra_Pradesh,"Secretariat, Amaravati - APPCB"
61.0,Delhi,India,2025-05-10T16:00:00.000+0000,28.498571,77.26484,88.0,26.0,CO,Delhi,"Dr. Karni Singh Shooting Range, Delhi - DPCC"


In [0]:
# df_fixedDuplicates.write.format("delta").save("dbfs:/FileStore/tables/AQI_raw_delta(transformed)")

df_justForCount = spark.read.format("delta").load("dbfs:/FileStore/tables/AQI_raw_delta(transformed)")
count_before_Adding_new_data = df_justForCount.count();

In [0]:
from delta.tables import DeltaTable

old_table = DeltaTable.forPath(spark, "dbfs:/FileStore/tables/AQI_raw_delta(transformed)")

# Merge new data into existing Delta table (upsert)
old_table.alias("old").merge(df_fixedDuplicates.alias("new"), 
    "old.last_update = new.last_update AND old.station = new.station AND old.pollutant_id = new.pollutant_id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

In [0]:
df_check = spark.read.format("delta").load("dbfs:/FileStore/tables/AQI_raw_delta(transformed)")

In [0]:
# df_old = old_table.toDF()
print(df_check.count())

print(f"total {df_check.count() - count_before_Adding_new_data} records are added(new), rest are updated")

793
total 93 records are added(new), rest are updated
