#### Data Wrangling using spark for big data

Though Pandas is generally used for data wrangling tasks, it gets increasingly difficult if the dataset size increases. The distributed compute of Spark opens up new possibilities of big data processing. Here, Electricity Cost Dataset seen in [Kaggle](https://www.kaggle.com/datasets/shalmamuji/electricity-cost-prediction-dataset) is used for data wrangling tasks using Pyspark.

Building Spark Session

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("dataframes").getOrCreate()


Importing the required libraries and functions

In [0]:
import datetime
from math import fabs
from pyspark.sql.functions import when, array_join, collect_list,collect_set,col,count,countDistinct, abs, row_number, desc, asc, mean, avg,sum,corr
from pyspark.sql.window import Window

Importing the dataset using spark.sql table

In [0]:
df = spark.sql("select * from workspace.default.electricity_cost_dataset")
df.show(5)

+---------+--------------+-----------------+--------------+----------------+----------------+--------------------+--------------+----------------+
|site area|structure type|water consumption|recycling rate|utilisation rate|air qality index|issue reolution time|resident count|electricity cost|
+---------+--------------+-----------------+--------------+----------------+----------------+--------------------+--------------+----------------+
|     1360|     Mixed-use|           2519.0|            69|              52|             188|                   1|            72|          1420.0|
|     4272|     Mixed-use|           2324.0|            50|              76|             165|                  65|           261|          3298.0|
|     3592|     Mixed-use|           2701.0|            20|              94|             198|                  39|           117|          3115.0|
|      966|   Residential|           1000.0|            13|              60|              74|                   3|    

Reading the data through spark.read_table

In [0]:
df = spark.read.table("workspace.default.electricity_cost_dataset")
df.show(5)

+---------+--------------+-----------------+--------------+----------------+----------------+--------------------+--------------+----------------+
|site area|structure type|water consumption|recycling rate|utilisation rate|air qality index|issue reolution time|resident count|electricity cost|
+---------+--------------+-----------------+--------------+----------------+----------------+--------------------+--------------+----------------+
|     1360|     Mixed-use|           2519.0|            69|              52|             188|                   1|            72|          1420.0|
|     4272|     Mixed-use|           2324.0|            50|              76|             165|                  65|           261|          3298.0|
|     3592|     Mixed-use|           2701.0|            20|              94|             198|                  39|           117|          3115.0|
|      966|   Residential|           1000.0|            13|              60|              74|                   3|    

Data inspections 

In [0]:
df.describe().show()
df.printSchema()
df.show()

+-------+-----------------+--------------+-----------------+-----------------+------------------+-----------------+--------------------+------------------+-----------------+
|summary|        site area|structure type|water consumption|   recycling rate|  utilisation rate| air qality index|issue reolution time|    resident count| electricity cost|
+-------+-----------------+--------------+-----------------+-----------------+------------------+-----------------+--------------------+------------------+-----------------+
|  count|            10000|         10000|            10000|            10000|             10000|            10000|               10000|             10000|            10000|
|   mean|        2757.7751|          NULL|        3494.0571|           49.598|           64.8422|          99.4686|             36.4026|           85.5731|         2837.845|
| stddev|1293.059958673906|          NULL|2076.181117350515|23.43014069845324|20.432964781572363|58.01452561693715|  20.6239047657

Choosing columns using select


In [0]:
df.select(df['air qality index']).show(3)

+----------------+
|air qality index|
+----------------+
|             188|
|             165|
|             198|
+----------------+
only showing top 3 rows


Selecting the aggregated columns

In [0]:

df.select(mean('air qality index')).show()


+---------------------+
|avg(air qality index)|
+---------------------+
|              99.4686|
+---------------------+



Filtering the dataframe using 'filter'

In [0]:
df.filter(df['air qality index'] > 50).show(3)


+---------+--------------+-----------------+--------------+----------------+----------------+--------------------+--------------+----------------+
|site area|structure type|water consumption|recycling rate|utilisation rate|air qality index|issue reolution time|resident count|electricity cost|
+---------+--------------+-----------------+--------------+----------------+----------------+--------------------+--------------+----------------+
|     1360|     Mixed-use|           2519.0|            69|              52|             188|                   1|            72|          1420.0|
|     4272|     Mixed-use|           2324.0|            50|              76|             165|                  65|           261|          3298.0|
|     3592|     Mixed-use|           2701.0|            20|              94|             198|                  39|           117|          3115.0|
+---------+--------------+-----------------+--------------+----------------+----------------+--------------------+----

Having multiple filter conditions

In [0]:
df.filter(  (df['air qality index'] > 50) & (df['electricity cost'] > df['recycling rate'])   ).select(["air qality index",'electricity cost']).show(3)

+----------------+----------------+
|air qality index|electricity cost|
+----------------+----------------+
|             188|          1420.0|
|             165|          3298.0|
|             198|          3115.0|
+----------------+----------------+
only showing top 3 rows


Invert the selection using `~`


In [0]:
df.filter(  (df['air qality index'] > 50) & ~(df['electricity cost'] > (df['water consumption']*1.5))   ).select(["air qality index",'electricity cost']).show(3)

+----------------+----------------+
|air qality index|electricity cost|
+----------------+----------------+
|             188|          1420.0|
|             165|          3298.0|
|             198|          3115.0|
+----------------+----------------+
only showing top 3 rows


PySpark dataframe can be collected as list of row objects, the format that facilitates distributed processing.

In [0]:
# Collect rows as a list of (column name, value) pairs
rows = df.filter((df['electricity cost'] == 1420)).collect()
first_row = rows[0]
first_row

Row(site area=1360, structure type='Mixed-use', water consumption=2519.0, recycling rate=69, utilisation rate=52, air qality index=188, issue reolution time=1, resident count=72, electricity cost=1420.0)

These rows has information about the column names. THe dictionary format expresses the columns and data as key value pairs.

In [0]:
first_row.asDict()

{'site area': 1360,
 'structure type': 'Mixed-use',
 'water consumption': 2519.0,
 'recycling rate': 69,
 'utilisation rate': 52,
 'air qality index': 188,
 'issue reolution time': 1,
 'resident count': 72,
 'electricity cost': 1420.0}

The common daaframe format can be applied to the row converted as dictionary.

In [0]:
first_row.asDict()['site area']

1360

The operations can be chained one after another and lazily executed once the entire set of operations are compiles.

In [0]:
df.filter(df['air qality index'] > 50).select(["air qality index",'electricity cost']).collect()[:5]

[Row(air qality index=188, electricity cost=1420.0),
 Row(air qality index=165, electricity cost=3298.0),
 Row(air qality index=198, electricity cost=3115.0),
 Row(air qality index=74, electricity cost=1575.0),
 Row(air qality index=194, electricity cost=3800.0)]

Aggregations

In [0]:
df.groupby(['structure type']).count().show()

+--------------+-----+
|structure type|count|
+--------------+-----+
|     Mixed-use| 2052|
|   Residential| 3939|
|    Commercial| 3005|
|    Industrial| 1004|
+--------------+-----+



The aggregation functions can be invoked as methods as well as imported functions

In [0]:
df.groupby(['structure type']).agg({'electricity cost':'count'} ).show()

+--------------+-----------------------+
|structure type|count(electricity cost)|
+--------------+-----------------------+
|     Mixed-use|                   2052|
|   Residential|                   3939|
|    Commercial|                   3005|
|    Industrial|                   1004|
+--------------+-----------------------+



In [0]:
df.groupby(['structure type']).count().show()

+--------------+-----+
|structure type|count|
+--------------+-----+
|     Mixed-use| 2052|
|   Residential| 3939|
|    Commercial| 3005|
|    Industrial| 1004|
+--------------+-----+



Unlike Pandas groupby, PySpark groupby can have no input arguments. This mimics the group by method in SQL.

In [0]:
df.groupby().agg({'electricity cost':'mean'} ).show()

+----------------------+
|mean(electricity cost)|
+----------------------+
|              2837.845|
+----------------------+



Renaming a column

In [0]:
df.groupby(['structure type']).sum('electricity cost').withColumnRenamed("structure type", "Building type").show()

+-------------+---------------------+
|Building type|sum(electricity cost)|
+-------------+---------------------+
|    Mixed-use|            4729749.0|
|  Residential|          1.1787321E7|
|   Commercial|            8450682.0|
|   Industrial|            3410698.0|
+-------------+---------------------+



In [0]:
df.groupby(['structure type']).agg(
    sum("electricity cost").alias("total_cost"),
    avg("electricity cost").alias("avg_cost"),
    avg("water consumption").alias("avg_consumption"),
    ).show()

+--------------+-----------+------------------+------------------+
|structure type| total_cost|          avg_cost|   avg_consumption|
+--------------+-----------+------------------+------------------+
|     Mixed-use|  4729749.0|2304.9459064327484| 3505.103313840156|
|   Residential|1.1787321E7|2992.4653465346537|3510.5592790048236|
|    Commercial|  8450682.0|2812.2069883527456| 3493.320133111481|
|    Industrial|  3410698.0| 3397.109561752988|3408.9432270916336|
+--------------+-----------+------------------+------------------+



In [0]:
df.select(['recycling rate']).distinct().count()

81

Merging two dataframes together using join

In [0]:
a = df.groupby(['structure type']).sum('electricity cost')
b = df.groupby(['structure type']).count().withColumnRenamed("structure type", "Building type")

a_b = a.join(b, a['structure type'] == b['Building type'], how="inner")

a_b.show()

+--------------+---------------------+-------------+-----+
|structure type|sum(electricity cost)|Building type|count|
+--------------+---------------------+-------------+-----+
|    Commercial|            8450682.0|   Commercial| 3005|
|     Mixed-use|            4729749.0|    Mixed-use| 2052|
|   Residential|          1.1787321E7|  Residential| 3939|
|    Industrial|            3410698.0|   Industrial| 1004|
+--------------+---------------------+-------------+-----+



Column operations

In [0]:
df.withColumn("double_electricity_cost", col("electricity cost") * 2).select(['electricity cost', 'double_electricity_cost']).show(5)

+----------------+-----------------------+
|electricity cost|double_electricity_cost|
+----------------+-----------------------+
|          1420.0|                 2840.0|
|          3298.0|                 6596.0|
|          3115.0|                 6230.0|
|          1575.0|                 3150.0|
|          4301.0|                 8602.0|
+----------------+-----------------------+
only showing top 5 rows


In [0]:
df.withColumn("electricity_cost_group", when(col('electricity cost')<2000 ,"Low").otherwise("High")).select(['electricity cost','electricity_cost_group']).show(10)

+----------------+----------------------+
|electricity cost|electricity_cost_group|
+----------------+----------------------+
|          1420.0|                   Low|
|          3298.0|                  High|
|          3115.0|                  High|
|          1575.0|                   Low|
|          4301.0|                  High|
|          3800.0|                  High|
|          3661.0|                  High|
|          2538.0|                  High|
|          1390.0|                   Low|
|          2599.0|                  High|
+----------------+----------------------+
only showing top 10 rows


In [0]:
df.withColumn("electricity_cost_group", when(col('electricity cost').cast("float")<5000 ,"Low").otherwise("High")).groupby(['structure type']).agg(array_join(collect_set(col('electricity_cost_group')),",").alias('Group'), countDistinct(col('water consumption')).alias('water_cons')).show()

+--------------+--------+----------+
|structure type|   Group|water_cons|
+--------------+--------+----------+
|     Mixed-use|     Low|      1621|
|   Residential|Low,High|      2704|
|    Commercial|Low,High|      2190|
|    Industrial|Low,High|       847|
+--------------+--------+----------+



Column operation using the recommended col('columnName') notation

In [0]:
df.withColumn("ratio",col('electricity cost')/col('water consumption')).select([col('ratio').cast("int"),col('electricity cost')]).show(5) 

+-----+----------------+
|ratio|electricity cost|
+-----+----------------+
|    0|          1420.0|
|    1|          3298.0|
|    1|          3115.0|
|    1|          1575.0|
|    0|          4301.0|
+-----+----------------+
only showing top 5 rows


Column operation using the pandas type df['column'] notation

In [0]:
df.withColumn("ratio",df['electricity cost']/df['water consumption']).select([col('ratio').cast("int"),df['electricity cost']]).show(5)  

+-----+----------------+
|ratio|electricity cost|
+-----+----------------+
|    0|          1420.0|
|    1|          3298.0|
|    1|          3115.0|
|    1|          1575.0|
|    0|          4301.0|
+-----+----------------+
only showing top 5 rows


Column operation using columns within functions

In [0]:
df.select(corr('electricity cost','water consumption')).show() # correlation

+-----------------------------------------+
|corr(electricity cost, water consumption)|
+-----------------------------------------+
|                       0.6987747451406682|
+-----------------------------------------+



Ordering data

In [0]:
df.orderBy(col('electricity cost').desc()).show(5)

+---------+--------------+-----------------+--------------+----------------+----------------+--------------------+--------------+----------------+
|site area|structure type|water consumption|recycling rate|utilisation rate|air qality index|issue reolution time|resident count|electricity cost|
+---------+--------------+-----------------+--------------+----------------+----------------+--------------------+--------------+----------------+
|     4787|   Residential|           7539.0|            56|              95|             105|                  11|           464|          6446.0|
|     4805|   Residential|           9818.0|            20|              86|              84|                  11|           446|          6416.0|
|     4852|   Residential|           3573.0|            60|              95|             115|                  55|           454|          6158.0|
|     4736|   Residential|           8932.0|            40|              48|              48|                  72|    

SQL like window operations

In [0]:
windowPartition = Window.partitionBy("structure type").orderBy(col('electricity cost').desc()) # creating partiton objext
df.withColumn("rank", row_number().over(windowPartition)).select(['structure type','electricity cost','rank']).show(3)

# filter(col('rank') == 1).select(['structure type','electricity cost']).show()

+--------------+----------------+----+
|structure type|electricity cost|rank|
+--------------+----------------+----+
|    Commercial|          5188.0|   1|
|    Commercial|          5180.0|   2|
|    Commercial|          5119.0|   3|
+--------------+----------------+----+
only showing top 3 rows


Handling NA

In [0]:
df.na.drop().show(3)

+---------+--------------+-----------------+--------------+----------------+----------------+--------------------+--------------+----------------+
|site area|structure type|water consumption|recycling rate|utilisation rate|air qality index|issue reolution time|resident count|electricity cost|
+---------+--------------+-----------------+--------------+----------------+----------------+--------------------+--------------+----------------+
|     1360|     Mixed-use|           2519.0|            69|              52|             188|                   1|            72|          1420.0|
|     4272|     Mixed-use|           2324.0|            50|              76|             165|                  65|           261|          3298.0|
|     3592|     Mixed-use|           2701.0|            20|              94|             198|                  39|           117|          3115.0|
+---------+--------------+-----------------+--------------+----------------+----------------+--------------------+----

In [0]:
df.na.drop(thresh=2)
df.na.drop(how='any')
df.na.drop(how='all')
df.na.drop(subset=['site area'])
df.na.fill(0)
df.na.fill(0, subset=['resident count']).show(3)   # Fill only specific columns


+---------+--------------+-----------------+--------------+----------------+----------------+--------------------+--------------+----------------+
|site area|structure type|water consumption|recycling rate|utilisation rate|air qality index|issue reolution time|resident count|electricity cost|
+---------+--------------+-----------------+--------------+----------------+----------------+--------------------+--------------+----------------+
|     1360|     Mixed-use|           2519.0|            69|              52|             188|                   1|            72|          1420.0|
|     4272|     Mixed-use|           2324.0|            50|              76|             165|                  65|           261|          3298.0|
|     3592|     Mixed-use|           2701.0|            20|              94|             198|                  39|           117|          3115.0|
+---------+--------------+-----------------+--------------+----------------+----------------+--------------------+----