In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .appName("No2 Comparison")\
        .master("spark://spark-master:7077") \
        .config("spark.executor.cores", "2") \
        .config("spark.cores.max", "4") \
        .config("spark.executor.memory", "1g") \
        .config("spark.memory.fraction", "0.6") \
        .config("spark.memory.storageFraction", "0.2") \
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/13 09:39:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
spark

In [3]:
no2_df = spark.read.option("header", True)\
                   .option("inferSchema", True)\
                   .load(format='csv', path="/opt/spark-data/input/pakistan_no2_2025_full_year.csv")

                                                                                

In [4]:
no2_df.printSchema()

root
 |-- date: date (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- no2_column: double (nullable = true)
 |-- no2_quality: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)



In [5]:
# no2_df.show()

In [6]:
from pyspark.sql.functions import avg, sum, lag, expr
from pyspark.sql import Window

w = Window.partitionBy("year").orderBy("date").rowsBetween(-2, 0)

no2_moving_averages = no2_df.withColumn("Last Three Day Average", avg("no2_column").over(w))

In [7]:
# no2_moving_averages.show()

In [8]:
no2_moving_averages.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Window [avg(no2_column#20) windowspecdefinition(year#22, date#17 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -2, currentrow$())) AS Last Three Day Average#36], [year#22], [date#17 ASC NULLS FIRST]
   +- Sort [year#22 ASC NULLS FIRST, date#17 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(year#22, 200), ENSURE_REQUIREMENTS, [plan_id=27]
         +- FileScan csv [date#17,latitude#18,longitude#19,no2_column#20,no2_quality#21,year#22,month#23,day#24,day_of_week#25] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/opt/spark-data/input/pakistan_no2_2025_full_year.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<date:date,latitude:double,longitude:double,no2_column:double,no2_quality:double,year:int,m...




In [9]:
# no2_moving_averages.rdd.getNumPartitions()

In [10]:
# no2_df.rdd.getNumPartitions()

In [11]:
from pyspark.sql import functions as F

no2_moving_averages = no2_moving_averages.withColumns({"latitude": F.round(no2_df.latitude), "longitude": F.round(no2_df.longitude)})

In [12]:
# no2_moving_averages.show()

In [13]:
no2_moving_averages.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [date#17, round(latitude#18, 0) AS latitude#48, round(longitude#19, 0) AS longitude#49, no2_column#20, no2_quality#21, year#22, month#23, day#24, day_of_week#25, Last Three Day Average#36]
   +- Window [avg(no2_column#20) windowspecdefinition(year#22, date#17 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -2, currentrow$())) AS Last Three Day Average#36], [year#22], [date#17 ASC NULLS FIRST]
      +- Sort [year#22 ASC NULLS FIRST, date#17 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(year#22, 200), ENSURE_REQUIREMENTS, [plan_id=42]
            +- FileScan csv [date#17,latitude#18,longitude#19,no2_column#20,no2_quality#21,year#22,month#23,day#24,day_of_week#25] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/opt/spark-data/input/pakistan_no2_2025_full_year.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<date:date,latitude:double,longitude:doub

In [14]:
# no2_moving_averages.rdd.getNumPartitions()

In [15]:
no2_repartiotned =no2_moving_averages.repartition(no2_moving_averages.longitude)

In [16]:
# no2_repartiotned.rdd.getNumPartitions()

In [17]:
no2_repartiotned.write.mode("overwrite").csv("/opt/spark-output/result", header=True)
# df.write.mode("overwrite").csv("/opt/spark-data/output/result", header=True)


                                                                                

In [None]:
no2_repartiotned.write.mode("overwrite").csv("/opt/spark-output/result", header=True)


In [18]:
no2_repartiotned.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange hashpartitioning(longitude#49, 200), REPARTITION_BY_COL, [plan_id=185]
   +- Project [date#17, round(latitude#18, 0) AS latitude#48, round(longitude#19, 0) AS longitude#49, no2_column#20, no2_quality#21, year#22, month#23, day#24, day_of_week#25, Last Three Day Average#36]
      +- Window [avg(no2_column#20) windowspecdefinition(year#22, date#17 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -2, currentrow$())) AS Last Three Day Average#36], [year#22], [date#17 ASC NULLS FIRST]
         +- Sort [year#22 ASC NULLS FIRST, date#17 ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(year#22, 200), ENSURE_REQUIREMENTS, [plan_id=181]
               +- FileScan csv [date#17,latitude#18,longitude#19,no2_column#20,no2_quality#21,year#22,month#23,day#24,day_of_week#25] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/opt/spark-data/input/pakistan_no2_2025_full_year.csv], P