In [1]:
import pyspark.sql as pyspark_sql
import pyspark.sql.functions as F

In [2]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [3]:
# SparkSession creation
spark = pyspark_sql.SparkSession.builder.appName("Data_Preprocessing").getOrCreate()

# Dividing Datasets according to the Location

### 1. col_mat_nuw_output.csv

In [4]:
# Read the data with setting column names
df1 = spark.read.csv("..\data\col_mat_nuw_output.csv", header=False, inferSchema=True)
df1 = df1.withColumnRenamed("_c0", "hcho_reading") \
    .withColumnRenamed("_c1", "location") \
    .withColumnRenamed("_c2", "current_date") \
    .withColumnRenamed("_c3", "next_date")

In [5]:
df1.printSchema()

root
 |-- hcho_reading: double (nullable = true)
 |-- location: string (nullable = true)
 |-- current_date: date (nullable = true)
 |-- next_date: date (nullable = true)



In [6]:
df1.show(truncate=False)

+---------------------+--------------+------------+----------+
|hcho_reading         |location      |current_date|next_date |
+---------------------+--------------+------------+----------+
|1.9698343957810148E-4|Colombo Proper|2019-01-01  |2019-01-02|
|2.6255221719685945E-4|Colombo Proper|2019-01-02  |2019-01-03|
|9.852118897938794E-5 |Colombo Proper|2019-01-03  |2019-01-04|
|2.099320518114242E-4 |Colombo Proper|2019-01-04  |2019-01-05|
|1.7853372988929305E-4|Colombo Proper|2019-01-05  |2019-01-06|
|1.0822967002356709E-4|Colombo Proper|2019-01-06  |2019-01-07|
|3.9268292804773094E-4|Colombo Proper|2019-01-07  |2019-01-08|
|9.153156350685351E-5 |Colombo Proper|2019-01-08  |2019-01-09|
|1.2059789928530154E-4|Colombo Proper|2019-01-09  |2019-01-10|
|1.2977235629832586E-4|Colombo Proper|2019-01-10  |2019-01-11|
|2.2391881668012785E-4|Colombo Proper|2019-01-11  |2019-01-12|
|1.5694180941787597E-4|Colombo Proper|2019-01-12  |2019-01-13|
|NULL                 |Colombo Proper|2019-01-13  |2019

In [7]:
# No of rows
print("No of rows: ", df1.count())

No of rows:  5478


In [8]:
# Convert Negative values to null values
df1 = df1.withColumn("hcho_reading", F.when(F.col("hcho_reading") < 0, None).otherwise(F.col("hcho_reading")))

In [9]:
# Number of null values in each column
df1.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df1.columns]).show()

+------------+--------+------------+---------+
|hcho_reading|location|current_date|next_date|
+------------+--------+------------+---------+
|        2682|       0|           0|        0|
+------------+--------+------------+---------+



In [10]:
# As a percentage get the number of null values in each column
df1.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df1.columns]) \
    .select([F.col(c) / df1.count() for c in df1.columns]).show()

+---------------------+-----------------+---------------------+------------------+
|(hcho_reading / 5478)|(location / 5478)|(current_date / 5478)|(next_date / 5478)|
+---------------------+-----------------+---------------------+------------------+
|   0.4895947426067908|              0.0|                  0.0|               0.0|
+---------------------+-----------------+---------------------+------------------+



In [11]:
# Group by location and get the count of each location
df1.groupBy("location").count().show()

+-------------------+-----+
|           location|count|
+-------------------+-----+
|   Deniyaya, Matara| 1826|
|     Colombo Proper| 1826|
|Nuwara Eliya Proper| 1826|
+-------------------+-----+



In [12]:
# Devide the dataset according to the location and save as dataframes
df_colombo = df1.filter(df1.location == "Colombo Proper").toPandas()
df_matara = df1.filter(df1.location == "Deniyaya, Matara").toPandas()
df_nuwara_eliya = df1.filter(df1.location == "Nuwara Eliya Proper").toPandas()

In [13]:
# Get the length of each dataframe
print("Colombo Proper:", len(df_colombo))
print("Deniyaya, Matara:", len(df_matara))
print("Nuwara Eliya Proper:", len(df_nuwara_eliya))

Colombo Proper: 1826
Deniyaya, Matara: 1826
Nuwara Eliya Proper: 1826


In [14]:
# Save the dataframes to csv files with dropping the location column
df_colombo.drop(columns=["location"]).to_csv(r"..\data\location\colombo_proper.csv", index=False)
df_matara.drop(columns=["location"]).to_csv(r"..\data\location\deniyaya_matara.csv", index=False)
df_nuwara_eliya.drop(columns=["location"]).to_csv(r"..\data\location\nuwaraeliya_proper.csv", index=False)

### 2. kan_output.csv

In [15]:
df2 = spark.read.csv("..\data\kan_output.csv", header=False, inferSchema=True)
df2 = df2.withColumnRenamed("_c0", "hcho_reading") \
    .withColumnRenamed("_c1", "location") \
    .withColumnRenamed("_c2", "current_date") \
    .withColumnRenamed("_c3", "next_date")

In [16]:
df2.printSchema()

root
 |-- hcho_reading: double (nullable = true)
 |-- location: string (nullable = true)
 |-- current_date: date (nullable = true)
 |-- next_date: date (nullable = true)



In [17]:
df2.show(truncate=False)

+----------------------+------------+------------+----------+
|hcho_reading          |location    |current_date|next_date |
+----------------------+------------+------------+----------+
|1.7607134598773356E-4 |Kandy Proper|2019-01-01  |2019-01-02|
|9.220391253917748E-5  |Kandy Proper|2019-01-02  |2019-01-03|
|NULL                  |Kandy Proper|2019-01-03  |2019-01-04|
|1.9086819838538396E-4 |Kandy Proper|2019-01-04  |2019-01-05|
|1.2195178402067448E-4 |Kandy Proper|2019-01-05  |2019-01-06|
|-6.514086129388805E-5 |Kandy Proper|2019-01-06  |2019-01-07|
|1.6323820639265E-4    |Kandy Proper|2019-01-07  |2019-01-08|
|-6.735205533914268E-5 |Kandy Proper|2019-01-08  |2019-01-09|
|1.2796936582431357E-4 |Kandy Proper|2019-01-09  |2019-01-10|
|4.546048424126012E-5  |Kandy Proper|2019-01-10  |2019-01-11|
|3.600074175192105E-5  |Kandy Proper|2019-01-11  |2019-01-12|
|1.286629698010177E-4  |Kandy Proper|2019-01-12  |2019-01-13|
|NULL                  |Kandy Proper|2019-01-13  |2019-01-14|
|NULL   

In [18]:
# No of rows
print("No of rows: ", df2.count())

No of rows:  1826


In [19]:
# Convert Negative values to null values
df2 = df2.withColumn("hcho_reading", F.when(F.col("hcho_reading") < 0, None).otherwise(F.col("hcho_reading")))

In [20]:
# Number of null values in each column
df2.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df2.columns]).show()

+------------+--------+------------+---------+
|hcho_reading|location|current_date|next_date|
+------------+--------+------------+---------+
|         901|       0|           0|        0|
+------------+--------+------------+---------+



In [21]:
# As a percentage get the number of null values in each column
df2.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df2.columns]) \
    .select([F.col(c) / df2.count() for c in df2.columns]).show()

+---------------------+-----------------+---------------------+------------------+
|(hcho_reading / 1826)|(location / 1826)|(current_date / 1826)|(next_date / 1826)|
+---------------------+-----------------+---------------------+------------------+
|  0.49342825848849947|              0.0|                  0.0|               0.0|
+---------------------+-----------------+---------------------+------------------+



In [22]:
# Get the distinct values in the location column
df2.select("location").distinct().show()

+------------+
|    location|
+------------+
|Kandy Proper|
+------------+



In [23]:
# Save to CSV file
df2.toPandas().drop(columns=["location"]).to_csv(r"..\data\location\kandy_proper.csv", index=False)

### 3. mon_kur_jaf_output.csv

In [24]:
df3 = spark.read.csv("..\data\mon_kur_jaf_output.csv", header=False, inferSchema=True)
df3 = df3.withColumnRenamed("_c0", "hcho_reading") \
    .withColumnRenamed("_c1", "location") \
    .withColumnRenamed("_c2", "current_date") \
    .withColumnRenamed("_c3", "next_date")

In [25]:
df3.printSchema()

root
 |-- hcho_reading: double (nullable = true)
 |-- location: string (nullable = true)
 |-- current_date: date (nullable = true)
 |-- next_date: date (nullable = true)



In [26]:
df3.show(truncate=False)

+----------------------+------------------+------------+----------+
|hcho_reading          |location          |current_date|next_date |
+----------------------+------------------+------------+----------+
|NULL                  |Bibile, Monaragala|2019-01-01  |2019-01-02|
|1.919914652467399E-5  |Bibile, Monaragala|2019-01-02  |2019-01-03|
|2.8114479359302837E-5 |Bibile, Monaragala|2019-01-03  |2019-01-04|
|3.747998184385943E-5  |Bibile, Monaragala|2019-01-04  |2019-01-05|
|-1.7982608793453114E-5|Bibile, Monaragala|2019-01-05  |2019-01-06|
|1.4578368961799026E-4 |Bibile, Monaragala|2019-01-06  |2019-01-07|
|2.8285908025465342E-5 |Bibile, Monaragala|2019-01-07  |2019-01-08|
|NULL                  |Bibile, Monaragala|2019-01-08  |2019-01-09|
|1.4208501670509577E-4 |Bibile, Monaragala|2019-01-09  |2019-01-10|
|NULL                  |Bibile, Monaragala|2019-01-10  |2019-01-11|
|2.014587947072581E-5  |Bibile, Monaragala|2019-01-11  |2019-01-12|
|1.5827876632101837E-4 |Bibile, Monaragala|2019-

In [27]:
# No of rows
print("No of rows: ", df3.count())

No of rows:  5478


In [28]:
# Convert Negative values to null values
df3 = df3.withColumn("hcho_reading", F.when(F.col("hcho_reading") < 0, None).otherwise(F.col("hcho_reading")))

In [29]:
# Number of null values in each column
df3.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df3.columns]).show()

+------------+--------+------------+---------+
|hcho_reading|location|current_date|next_date|
+------------+--------+------------+---------+
|        1844|       0|           0|        0|
+------------+--------+------------+---------+



In [30]:
# As a percentage get the number of null values in each column
df3.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df3.columns]) \
    .select([F.col(c) / df3.count() for c in df3.columns]).show()

+---------------------+-----------------+---------------------+------------------+
|(hcho_reading / 5478)|(location / 5478)|(current_date / 5478)|(next_date / 5478)|
+---------------------+-----------------+---------------------+------------------+
|  0.33661920408908363|              0.0|                  0.0|               0.0|
+---------------------+-----------------+---------------------+------------------+



In [31]:
# Group by location and get the count of each location
df3.groupBy("location").count().show()

+------------------+-----+
|          location|count|
+------------------+-----+
| Kurunegala Proper| 1826|
|Bibile, Monaragala| 1826|
|     Jaffna Proper| 1826|
+------------------+-----+



In [32]:
# Devide the dataset according to the location and save as dataframes
df_kurunegala = df3.filter(df3.location == "Kurunegala Proper").toPandas()
df_monaragala = df3.filter(df3.location == "Bibile, Monaragala").toPandas()
df_jaffna = df3.filter(df3.location == "Jaffna Proper").toPandas()

In [33]:
# Get the length of each dataframe
print("Kurunegala Proper:", len(df_kurunegala))
print("Bibile, Monaragala:", len(df_monaragala))
print("Jaffna Proper:", len(df_jaffna))

Kurunegala Proper: 1826
Bibile, Monaragala: 1826
Jaffna Proper: 1826


In [34]:
# Save the dataframes to csv files with dropping the location column
df_kurunegala.drop(columns=["location"]).to_csv(r"..\data\location\kurunegala_proper.csv", index=False)
df_monaragala.drop(columns=["location"]).to_csv(r"..\data\location\bibile_monaragala.csv", index=False)
df_jaffna.drop(columns=["location"]).to_csv(r"..\data\location\jaffna_proper.csv", index=False)

# Merge all dataframes into one

In [35]:
# Merge df1, df2 and df3
df = df1.union(df2).union(df3)

In [36]:
df.show(truncate=False)

+---------------------+--------------+------------+----------+
|hcho_reading         |location      |current_date|next_date |
+---------------------+--------------+------------+----------+
|1.9698343957810148E-4|Colombo Proper|2019-01-01  |2019-01-02|
|2.6255221719685945E-4|Colombo Proper|2019-01-02  |2019-01-03|
|9.852118897938794E-5 |Colombo Proper|2019-01-03  |2019-01-04|
|2.099320518114242E-4 |Colombo Proper|2019-01-04  |2019-01-05|
|1.7853372988929305E-4|Colombo Proper|2019-01-05  |2019-01-06|
|1.0822967002356709E-4|Colombo Proper|2019-01-06  |2019-01-07|
|3.9268292804773094E-4|Colombo Proper|2019-01-07  |2019-01-08|
|9.153156350685351E-5 |Colombo Proper|2019-01-08  |2019-01-09|
|1.2059789928530154E-4|Colombo Proper|2019-01-09  |2019-01-10|
|1.2977235629832586E-4|Colombo Proper|2019-01-10  |2019-01-11|
|2.2391881668012785E-4|Colombo Proper|2019-01-11  |2019-01-12|
|1.5694180941787597E-4|Colombo Proper|2019-01-12  |2019-01-13|
|NULL                 |Colombo Proper|2019-01-13  |2019

In [37]:
print("Entire Dataset Count:",df.count())

Entire Dataset Count: 12782


In [38]:
# Group by location and get the count of each location
df.groupBy("location").count().show()

+-------------------+-----+
|           location|count|
+-------------------+-----+
|   Deniyaya, Matara| 1826|
|     Colombo Proper| 1826|
|Nuwara Eliya Proper| 1826|
|       Kandy Proper| 1826|
|  Kurunegala Proper| 1826|
| Bibile, Monaragala| 1826|
|      Jaffna Proper| 1826|
+-------------------+-----+



In [39]:
# Save the merged dataframe to a csv file
df.toPandas().to_csv(r"..\data\all_data.csv", index=False)