In [1]:
from pyspark.sql import SparkSession

# Replace these placeholders with appropriate values
your_app_name = "AirQualityAnalysis"
your_public_ip = " "  # Use your IPv4 address
your_local_ip = "localhost"  # Use "localhost" if running Spark locally

# Creating a Spark session
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
from pyspark.sql.functions import col, substring

spark = SparkSession.builder \
    .appName("AirQualityAnalysis") \
    .config("spark.ui.reverseProxy", True) \
    .config("spark.ui.reverseProxyUrl", "http://localhost:4040") \
    .config("spark.ui.reverseProxyAttempts", 1) \
    .config("spark.driver.bindAddress", "localhost") \
    .config("spark.driver.maxResultSize", "4g") \
    .getOrCreate()



In [2]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
from pyspark.sql.functions import col
from pymongo import MongoClient
mongo_uri = "mongodb://localhost:27017/"
database_name = "hello"
client = MongoClient(mongo_uri)
db = client[database_name]

In [3]:
stations_info_data = list(db["stations_info.csv"].find())

In [4]:
stations_info_schema = StructType([
    StructField("file_name", StringType(), True),
    StructField("state", StringType(), True),
    StructField("city", StringType(), True),
    StructField("agency", StringType(), True),
    StructField("station_location", StringType(), True),
    StructField("start_month", StringType(), True),
    StructField("start_month_num", IntegerType(), True),
    StructField("start_year", IntegerType(), True),
])
df_stations_info = spark.createDataFrame(stations_info_data, schema=stations_info_schema).drop('_id')

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType
from datetime import datetime


In [6]:
df_stations_info.show(10)

+---------+--------------+-----------------+------+--------------------+-----------+---------------+----------+
|file_name|         state|             city|agency|    station_location|start_month|start_month_num|start_year|
+---------+--------------+-----------------+------+--------------------+-----------+---------------+----------+
|    AP001|Andhra Pradesh|         Tirupati| APPCB| Tirumala, Tirupati |       July|              7|      2016|
|    AP002|Andhra Pradesh|       Vijayawada| APPCB|PWD Grounds, Vija...|        May|              5|      2017|
|    AP003|Andhra Pradesh|    Visakhapatnam| APPCB|GVM Corporation, ...|       July|              7|      2017|
|    AP004|Andhra Pradesh|Rajamahendravaram| APPCB|Anand Kala Kshetr...|  September|              9|      2017|
|    AP005|Andhra Pradesh|        Amaravati| APPCB|Secretariat, Amar...|   November|             11|      2017|
|    AP006|Andhra Pradesh|        Anantapur| APPCB|Gulzarpet, Ananta...|     August|              8|    

                                                                                

In [7]:
def create_dataframe_for_prefix(prefix):
    prefix_collections = [f"{prefix}{i:03d}" for i in range(1, 41)]

#Fetching the data from each collection and storing it in a list
    prefix_data_list = []
    for collection_name in prefix_collections:
        prefix_data = list(db[collection_name].find())
        prefix_data_list.extend(prefix_data)

#Defining the schema
    prefix_data_schema = StructType([
    StructField("From Date", TimestampType(), True),
    StructField("To Date", TimestampType(), True),
    StructField("PM2.5 (ug/m3)", DoubleType(), True),
    StructField("PM10 (ug/m3)", DoubleType(), True),
    StructField("NO (ug/m3)", DoubleType(), True),
    StructField("NO2 (ug/m3)", DoubleType(), True),
    StructField("NOx (ppb)", DoubleType(), True),
    StructField("SO2 (ug/m3)", DoubleType(), True),
    StructField("CO (mg/m3)", DoubleType(), True),
    StructField("Ozone (ug/m3)", DoubleType(), True),
    StructField("Benzene (ug/m3)", DoubleType(), True),
    StructField("file_name", StringType(), True),
])

#Converting string dates to TimestampType
    for entry in prefix_data_list:
        entry['From Date'] = datetime.strptime(entry['From Date'], '%Y-%m-%d %H:%M:%S')
        entry['To Date'] = datetime.strptime(entry['To Date'], '%Y-%m-%d %H:%M:%S')
        
    prefix_df = spark.createDataFrame(prefix_data_list, schema=prefix_data_schema)
    prefix_df = prefix_df.drop('_id')

    return prefix_df

dl_df = create_dataframe_for_prefix("DL")

In [8]:
dl_df.show(10)

23/11/30 16:35:56 WARN TaskSetManager: Stage 1 contains a task of very large size (7489 KiB). The maximum recommended task size is 1000 KiB.
[Stage 1:>                                                          (0 + 1) / 1]

+-------------------+-------------------+-----------------+-----------------+----------+-----------+---------+-----------+----------+-------------+------------------+---------+
|          From Date|            To Date|    PM2.5 (ug/m3)|     PM10 (ug/m3)|NO (ug/m3)|NO2 (ug/m3)|NOx (ppb)|SO2 (ug/m3)|CO (mg/m3)|Ozone (ug/m3)|   Benzene (ug/m3)|file_name|
+-------------------+-------------------+-----------------+-----------------+----------+-----------+---------+-----------+----------+-------------+------------------+---------+
|2010-01-01 00:00:00|2010-01-01 01:00:00|83.72420430274217|73.29047204423317|     21.02|       41.0|    38.75|       4.27|      4.43|          3.0|2.2437990905332783|    DL001|
|2010-01-01 01:00:00|2010-01-01 02:00:00|83.72420430274217|73.29047204423317|      9.12|       29.5|    23.25|       4.55|      3.69|          3.5|2.2437990905332783|    DL001|
|2010-01-01 02:00:00|2010-01-01 03:00:00|83.72420430274217|73.29047204423317|     10.48|      27.25|    23.25|     

23/11/30 16:36:00 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 1 (TID 1): Attempting to kill Python Worker
                                                                                

In [9]:
dl_df.printSchema()

root
 |-- From Date: timestamp (nullable = true)
 |-- To Date: timestamp (nullable = true)
 |-- PM2.5 (ug/m3): double (nullable = true)
 |-- PM10 (ug/m3): double (nullable = true)
 |-- NO (ug/m3): double (nullable = true)
 |-- NO2 (ug/m3): double (nullable = true)
 |-- NOx (ppb): double (nullable = true)
 |-- SO2 (ug/m3): double (nullable = true)
 |-- CO (mg/m3): double (nullable = true)
 |-- Ozone (ug/m3): double (nullable = true)
 |-- Benzene (ug/m3): double (nullable = true)
 |-- file_name: string (nullable = true)



In [10]:
ka_df = create_dataframe_for_prefix("KA")

In [11]:
ka_df.printSchema()

root
 |-- From Date: timestamp (nullable = true)
 |-- To Date: timestamp (nullable = true)
 |-- PM2.5 (ug/m3): double (nullable = true)
 |-- PM10 (ug/m3): double (nullable = true)
 |-- NO (ug/m3): double (nullable = true)
 |-- NO2 (ug/m3): double (nullable = true)
 |-- NOx (ppb): double (nullable = true)
 |-- SO2 (ug/m3): double (nullable = true)
 |-- CO (mg/m3): double (nullable = true)
 |-- Ozone (ug/m3): double (nullable = true)
 |-- Benzene (ug/m3): double (nullable = true)
 |-- file_name: string (nullable = true)



In [12]:
mh_df = create_dataframe_for_prefix("MH")

In [13]:
up_df = create_dataframe_for_prefix("UP")

In [14]:
tn_df = create_dataframe_for_prefix("TN")

In [15]:
up_df.printSchema()

root
 |-- From Date: timestamp (nullable = true)
 |-- To Date: timestamp (nullable = true)
 |-- PM2.5 (ug/m3): double (nullable = true)
 |-- PM10 (ug/m3): double (nullable = true)
 |-- NO (ug/m3): double (nullable = true)
 |-- NO2 (ug/m3): double (nullable = true)
 |-- NOx (ppb): double (nullable = true)
 |-- SO2 (ug/m3): double (nullable = true)
 |-- CO (mg/m3): double (nullable = true)
 |-- Ozone (ug/m3): double (nullable = true)
 |-- Benzene (ug/m3): double (nullable = true)
 |-- file_name: string (nullable = true)



In [16]:
tn_df.show(10)

23/11/30 16:53:31 WARN TaskSetManager: Stage 2 contains a task of very large size (3748 KiB). The maximum recommended task size is 1000 KiB.
[Stage 2:>                                                          (0 + 1) / 1]

+-------------------+-------------------+-----------------+-----------------+----------+-----------+---------+-----------+----------+-------------+------------------+---------+
|          From Date|            To Date|    PM2.5 (ug/m3)|     PM10 (ug/m3)|NO (ug/m3)|NO2 (ug/m3)|NOx (ppb)|SO2 (ug/m3)|CO (mg/m3)|Ozone (ug/m3)|   Benzene (ug/m3)|file_name|
+-------------------+-------------------+-----------------+-----------------+----------+-----------+---------+-----------+----------+-------------+------------------+---------+
|2010-01-01 00:00:00|2010-01-01 01:00:00|47.98693658729114|68.10118941403422|     77.97|     167.54|   245.51|       4.71|      2.25|         2.71|0.9528246674307944|    TN001|
|2010-01-01 01:00:00|2010-01-01 02:00:00|47.98693658729114|68.10118941403422|      64.1|     139.83|   203.94|       6.45|      2.26|          2.7|0.9528246674307944|    TN001|
|2010-01-01 02:00:00|2010-01-01 03:00:00|47.98693658729114|68.10118941403422|     44.99|     100.53|   145.53|     

23/11/30 16:53:35 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 2 (TID 2): Attempting to kill Python Worker
                                                                                

In [17]:
files_to_keep = ["DL001", "DL002", "DL003", "DL004", "DL005", "DL006", "KA001", "KA002", "KA003", "MH001", "MH002", "MH003", "TN001", "TN002", "TN003", "UP001", "UP002"]

filtered_df_stations_info = df_stations_info.filter(col('file_name').isin(files_to_keep))

In [18]:
filtered_df_stations_info.show()

+---------+-------------+-----------+------+--------------------+-----------+---------------+----------+
|file_name|        state|       city|agency|    station_location|start_month|start_month_num|start_year|
+---------+-------------+-----------+------+--------------------+-----------+---------------+----------+
|    DL001|        Delhi|      Delhi|  CPCB|         ITO, Delhi |    January|              1|      2010|
|    DL002|        Delhi|      Delhi|  CPCB|    Shadipur, Delhi |    January|              1|      2010|
|    DL003|        Delhi|      Delhi|  CPCB|    Sirifort, Delhi |    January|              1|      2010|
|    DL004|        Delhi|      Delhi|  CPCB| NSIT Dwarka, Delhi |    January|              1|      2010|
|    DL005|        Delhi|      Delhi|  CPCB|IHBAS, Dilshad Ga...|    January|              1|      2010|
|    DL006|        Delhi|      Delhi|  CPCB|         DTU, Delhi |    January|              1|      2010|
|    KA001|    Karnataka|  Bengaluru|  CPCB|BTM Layout,

In [19]:
from pyspark.sql.functions import col

# Example for dl_df
prefix_dl_df = "DL"  # Extracted from the DataFrame name
filtered_dl_info = filtered_df_stations_info.filter(col('file_name').startswith(prefix_dl_df))

# Join dl_df with filtered_dl_info
dl_df_combined = dl_df.join(filtered_dl_info, on='file_name', how='inner')


In [20]:
dl_df_combined.show(10)

23/11/30 16:53:44 WARN TaskSetManager: Stage 6 contains a task of very large size (7489 KiB). The maximum recommended task size is 1000 KiB.

+---------+-------------------+-------------------+-----------------+-----------------+----------+-----------+---------+-----------+-----------------+-------------+---------------+-----+-----+------+----------------+-----------+---------------+----------+
|file_name|          From Date|            To Date|    PM2.5 (ug/m3)|     PM10 (ug/m3)|NO (ug/m3)|NO2 (ug/m3)|NOx (ppb)|SO2 (ug/m3)|       CO (mg/m3)|Ozone (ug/m3)|Benzene (ug/m3)|state| city|agency|station_location|start_month|start_month_num|start_year|
+---------+-------------------+-------------------+-----------------+-----------------+----------+-----------+---------+-----------+-----------------+-------------+---------------+-----+-----+------+----------------+-----------+---------------+----------+
|    DL002|2010-01-01 00:00:00|2010-01-01 01:00:00|59.41818649235221|46.85813662670525|     68.69|     132.78|   201.47|        3.2|             1.21|         5.76|          54.66|Delhi|Delhi|  CPCB|Shadipur, Delhi |    January|    

                                                                                

In [21]:
prefix_tn_df = "TN" 
filtered_tn_info = filtered_df_stations_info.filter(col('file_name').startswith(prefix_tn_df))
tn_df_combined = tn_df.join(filtered_tn_info, on='file_name', how='inner')

In [22]:
prefix_up_df = "UP"  
filtered_up_info = filtered_df_stations_info.filter(col('file_name').startswith(prefix_up_df))
up_df_combined = up_df.join(filtered_up_info, on='file_name', how='inner')

In [23]:
prefix_mh_df = "MH" 
filtered_mh_info = filtered_df_stations_info.filter(col('file_name').startswith(prefix_mh_df))
mh_df_combined = mh_df.join(filtered_mh_info, on='file_name', how='inner')

In [24]:
prefix_ka_df = "KA"  
filtered_ka_info = filtered_df_stations_info.filter(col('file_name').startswith(prefix_ka_df))
ka_df_combined = ka_df.join(filtered_ka_info, on='file_name', how='inner')

In [28]:
# Save DataFrames to CSV locally
dl_df_combined.write.csv("file:///Users/meetsmacbook/Downloads/spark csvs/dl_combined.csv", header=True, mode="overwrite")
tn_df_combined.write.csv("file:///Users/meetsmacbook/Downloads/spark csvs/tn_combined.csv", header=True, mode="overwrite")
up_df_combined.write.csv("file:///Users/meetsmacbook/Downloads/spark csvs/up_combined.csv", header=True, mode="overwrite")
ka_df_combined.write.csv("file:///Users/meetsmacbook/Downloads/spark csvs/ka_combined.csv", header=True, mode="overwrite")
mh_df_combined.write.csv("file:///Users/meetsmacbook/Downloads/spark csvs/mh_combined.csv", header=True, mode="overwrite")


23/11/30 17:02:35 WARN TaskSetManager: Stage 15 contains a task of very large size (7489 KiB). The maximum recommended task size is 1000 KiB.
23/11/30 17:02:42 WARN TaskSetManager: Stage 20 contains a task of very large size (3748 KiB). The maximum recommended task size is 1000 KiB.
23/11/30 17:02:43 WARN TaskSetManager: Stage 25 contains a task of very large size (2427 KiB). The maximum recommended task size is 1000 KiB.
23/11/30 17:02:43 WARN TaskSetManager: Stage 30 contains a task of very large size (3748 KiB). The maximum recommended task size is 1000 KiB.
23/11/30 17:02:44 WARN TaskSetManager: Stage 35 contains a task of very large size (3748 KiB). The maximum recommended task size is 1000 KiB.
