In [10]:
import pyspark
import pyspark.sql  as pyspark_sql
import pyspark.sql.types as pyspark_types
import pyspark.sql.functions  as pyspark_functions
from pyspark import SparkContext, SparkConf
from pandas import isnull
from numpy import count_nonzero
from pyspark.sql.functions import col, count, isnan, when, coalesce, lag, lead, sum
from pyspark.sql.window import Window

import warnings
warnings.filterwarnings("ignore")

In [2]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = pyspark_sql.SparkSession.builder.getOrCreate()

In [3]:
# Load the dataset
data = spark.read.csv("dataset/col_mat_nuw_output.csv", header=True, inferSchema=True)

In [4]:
data.printSchema()

root
 |-- 0.00019698343957810148: double (nullable = true)
 |-- Colombo Proper: string (nullable = true)
 |-- 2019-01-01: date (nullable = true)
 |-- 2019-01-02: date (nullable = true)



In [5]:
data.show()

+----------------------+--------------+----------+----------+
|0.00019698343957810148|Colombo Proper|2019-01-01|2019-01-02|
+----------------------+--------------+----------+----------+
|  2.625522171968594...|Colombo Proper|2019-01-02|2019-01-03|
|  9.852118897938794E-5|Colombo Proper|2019-01-03|2019-01-04|
|  2.099320518114242E-4|Colombo Proper|2019-01-04|2019-01-05|
|  1.785337298892930...|Colombo Proper|2019-01-05|2019-01-06|
|  1.082296700235670...|Colombo Proper|2019-01-06|2019-01-07|
|  3.926829280477309...|Colombo Proper|2019-01-07|2019-01-08|
|  9.153156350685351E-5|Colombo Proper|2019-01-08|2019-01-09|
|  1.205978992853015...|Colombo Proper|2019-01-09|2019-01-10|
|  1.297723562983258...|Colombo Proper|2019-01-10|2019-01-11|
|  2.239188166801278...|Colombo Proper|2019-01-11|2019-01-12|
|  1.569418094178759...|Colombo Proper|2019-01-12|2019-01-13|
|                  NULL|Colombo Proper|2019-01-13|2019-01-14|
|  1.336291906862603...|Colombo Proper|2019-01-14|2019-01-15|
|  6.374

In [6]:
column_names = ['HCHO reading', 'Location', 'Current Date', 'Next Date']

# Rename columns using withColumnRenamed()
for i, new_name in enumerate(column_names):
    data = data.withColumnRenamed(data.columns[i], new_name)

# Display the DataFrame
data.show()

+--------------------+--------------+------------+----------+
|        HCHO reading|      Location|Current Date| Next Date|
+--------------------+--------------+------------+----------+
|2.625522171968594...|Colombo Proper|  2019-01-02|2019-01-03|
|9.852118897938794E-5|Colombo Proper|  2019-01-03|2019-01-04|
|2.099320518114242E-4|Colombo Proper|  2019-01-04|2019-01-05|
|1.785337298892930...|Colombo Proper|  2019-01-05|2019-01-06|
|1.082296700235670...|Colombo Proper|  2019-01-06|2019-01-07|
|3.926829280477309...|Colombo Proper|  2019-01-07|2019-01-08|
|9.153156350685351E-5|Colombo Proper|  2019-01-08|2019-01-09|
|1.205978992853015...|Colombo Proper|  2019-01-09|2019-01-10|
|1.297723562983258...|Colombo Proper|  2019-01-10|2019-01-11|
|2.239188166801278...|Colombo Proper|  2019-01-11|2019-01-12|
|1.569418094178759...|Colombo Proper|  2019-01-12|2019-01-13|
|                NULL|Colombo Proper|  2019-01-13|2019-01-14|
|1.336291906862603...|Colombo Proper|  2019-01-14|2019-01-15|
|6.37441

In [15]:
# Check unique values of cities
unique_cities = data.select("Location").distinct()
unique_cities.show()

+-------------------+
|           Location|
+-------------------+
|   Deniyaya, Matara|
|     Colombo Proper|
|Nuwara Eliya Proper|
+-------------------+



In [16]:
# Describe the 'HCHO reading' column
data.select('HCHO reading').describe().show()

+-------+--------------------+
|summary|        HCHO reading|
+-------+--------------------+
|  count|                5477|
|   mean|1.202803756272528E-4|
| stddev|1.021701071134199...|
|    min|-2.59296176552668...|
|    max|8.997101837438971E-4|
+-------+--------------------+



In [9]:
# Check for null values in the DataFrame
data.select([count(when(col(c).isNull(), c)).alias(c) for c in data.columns]).show()

+------------+--------+------------+---------+
|HCHO reading|Location|Current Date|Next Date|
+------------+--------+------------+---------+
|        2419|       0|           0|        0|
+------------+--------+------------+---------+



In [12]:
# Initialize a counter for null values
null_count = data.filter(col("HCHO Reading").isNull()).count()

# Continue the process until there are no nulls left
while null_count > 0:
    # Define the window specifications
    backward_windowSpec = Window.orderBy("Current Date")
    forward_windowSpec = Window.orderBy("Current Date")
    
    # Use lag to carry the last observation forward
    previous_value = lag("HCHO Reading", 1).over(backward_windowSpec)
    data = data.withColumn("HCHO Reading", coalesce("HCHO Reading", previous_value))
    
    # Use lead to carry the next observation backward
    next_value = lead("HCHO Reading", 1).over(forward_windowSpec)
    data = data.withColumn("HCHO Reading", coalesce("HCHO Reading", next_value))
    
    # Update the null count
    null_count = data.filter(col("HCHO Reading").isNull()).count()

# cmn_data now has the nulls filled using a combination of LOCF and NOCB

In [13]:
data.show()

+--------------------+-------------------+------------+----------+
|        HCHO Reading|           Location|Current Date| Next Date|
+--------------------+-------------------+------------+----------+
|2.625522171968594...|   Deniyaya, Matara|  2019-01-01|2019-01-02|
|2.625522171968594...|Nuwara Eliya Proper|  2019-01-01|2019-01-02|
|2.625522171968594...|     Colombo Proper|  2019-01-02|2019-01-03|
|5.803530712000793E-6|   Deniyaya, Matara|  2019-01-02|2019-01-03|
|5.803530712000793E-6|Nuwara Eliya Proper|  2019-01-02|2019-01-03|
|9.852118897938794E-5|     Colombo Proper|  2019-01-03|2019-01-04|
|2.362357772653922...|   Deniyaya, Matara|  2019-01-03|2019-01-04|
|1.908293886956784...|Nuwara Eliya Proper|  2019-01-03|2019-01-04|
|2.099320518114242E-4|     Colombo Proper|  2019-01-04|2019-01-05|
|6.437245753953118E-5|   Deniyaya, Matara|  2019-01-04|2019-01-05|
|5.097625917127737...|Nuwara Eliya Proper|  2019-01-04|2019-01-05|
|1.785337298892930...|     Colombo Proper|  2019-01-05|2019-01

In [17]:
# Count null values in each column
null_counts = data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns])
null_counts.show()

+------------+--------+------------+---------+
|HCHO Reading|Location|Current Date|Next Date|
+------------+--------+------------+---------+
|           0|       0|           0|        0|
+------------+--------+------------+---------+



In [18]:
# Drop rows with any null values
data_cleaned = data.na.drop()

# Show the cleaned DataFrame
data_cleaned.show()


+--------------------+-------------------+------------+----------+
|        HCHO Reading|           Location|Current Date| Next Date|
+--------------------+-------------------+------------+----------+
|2.625522171968594...|   Deniyaya, Matara|  2019-01-01|2019-01-02|
|2.625522171968594...|Nuwara Eliya Proper|  2019-01-01|2019-01-02|
|2.625522171968594...|     Colombo Proper|  2019-01-02|2019-01-03|
|5.803530712000793E-6|   Deniyaya, Matara|  2019-01-02|2019-01-03|
|5.803530712000793E-6|Nuwara Eliya Proper|  2019-01-02|2019-01-03|
|9.852118897938794E-5|     Colombo Proper|  2019-01-03|2019-01-04|
|2.362357772653922...|   Deniyaya, Matara|  2019-01-03|2019-01-04|
|1.908293886956784...|Nuwara Eliya Proper|  2019-01-03|2019-01-04|
|2.099320518114242E-4|     Colombo Proper|  2019-01-04|2019-01-05|
|6.437245753953118E-5|   Deniyaya, Matara|  2019-01-04|2019-01-05|
|5.097625917127737...|Nuwara Eliya Proper|  2019-01-04|2019-01-05|
|1.785337298892930...|     Colombo Proper|  2019-01-05|2019-01

In [19]:
# Count the number of rows in the DataFrame
data_count = data.count()

# Show the length of the DataFrame
print("Length of DataFrame:", data_count)

Length of DataFrame: 5477
