In this project I'll be doing data cleaning using Spark (PySpark) on Google Colab environment in a Jupyter Notebook. It can be seen that we don't necessarily need a cluster for Spark Programming.

Spark is faster than MapReduce because it uses RDDs, which can cache data in memory, are optimized for iterative processing, and use a DAG engine to optimize task execution.

The fundamental building blocks and the core data structure of Apache Spark are RDDs. RDDs (Resilient Distributed Datasets) are immutable distributed collections of objects that can be processed in parallel across a cluster of machines. They are fault-tolerant and can recover from node failures.

# Creating a Spark session & setting up the Spark environment - -

Installing JDK - 

In [7]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Spark Installer (using the link from spark.apache.org) -

In [8]:
!wget https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz

--2023-04-08 23:58:38--  https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
Resolving dlcdn.apache.org (dlcdn.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to dlcdn.apache.org (dlcdn.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 299360284 (285M) [application/x-gzip]
Saving to: ‘spark-3.3.2-bin-hadoop3.tgz.1’


2023-04-08 23:58:40 (195 MB/s) - ‘spark-3.3.2-bin-hadoop3.tgz.1’ saved [299360284/299360284]



In [9]:
!tar -xvf spark-3.3.2-bin-hadoop3.tgz

spark-3.3.2-bin-hadoop3/
spark-3.3.2-bin-hadoop3/LICENSE
spark-3.3.2-bin-hadoop3/NOTICE
spark-3.3.2-bin-hadoop3/R/
spark-3.3.2-bin-hadoop3/R/lib/
spark-3.3.2-bin-hadoop3/R/lib/SparkR/
spark-3.3.2-bin-hadoop3/R/lib/SparkR/DESCRIPTION
spark-3.3.2-bin-hadoop3/R/lib/SparkR/INDEX
spark-3.3.2-bin-hadoop3/R/lib/SparkR/Meta/
spark-3.3.2-bin-hadoop3/R/lib/SparkR/Meta/Rd.rds
spark-3.3.2-bin-hadoop3/R/lib/SparkR/Meta/features.rds
spark-3.3.2-bin-hadoop3/R/lib/SparkR/Meta/hsearch.rds
spark-3.3.2-bin-hadoop3/R/lib/SparkR/Meta/links.rds
spark-3.3.2-bin-hadoop3/R/lib/SparkR/Meta/nsInfo.rds
spark-3.3.2-bin-hadoop3/R/lib/SparkR/Meta/package.rds
spark-3.3.2-bin-hadoop3/R/lib/SparkR/Meta/vignette.rds
spark-3.3.2-bin-hadoop3/R/lib/SparkR/NAMESPACE
spark-3.3.2-bin-hadoop3/R/lib/SparkR/R/
spark-3.3.2-bin-hadoop3/R/lib/SparkR/R/SparkR
spark-3.3.2-bin-hadoop3/R/lib/SparkR/R/SparkR.rdb
spark-3.3.2-bin-hadoop3/R/lib/SparkR/R/SparkR.rdx
spark-3.3.2-bin-hadoop3/R/lib/SparkR/doc/
spark-3.3.2-bin-hadoop3/R/lib/Spar

Setting up Environment Variable (JAVA-HOME and SPARK-HOME) Paths - 

In [11]:
!ls /usr/lib/jvm

java-1.11.0-openjdk-amd64  java-1.8.0-openjdk-amd64
java-11-openjdk-amd64	   java-8-openjdk-amd64


In [12]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"

#### Installing findspark

"**findspark**" is a Python library that provides a simple way to add Apache Spark to a Python environment. It allows users to locate and use a Spark installation on their computer, and then easily integrate Spark into their Python code. This library is particularly useful for those who want to work with Spark in a Jupyter notebook or other Python environment, without having to install and configure Spark separately.

In [13]:
!pip install -q findspark

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [23]:
!wget https://raw.githubusercontent.com/Abhishek-Dxt/PySpark_Scala/main/Consumer_data.csv

--2023-04-09 00:27:31--  https://raw.githubusercontent.com/Abhishek-Dxt/PySpark_Scala/main/Consumer_data.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.108.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 17552 (17K) [text/plain]
Saving to: ‘Consumer_data.csv’


2023-04-09 00:27:31 (14.7 MB/s) - ‘Consumer_data.csv’ saved [17552/17552]



In [24]:
!ls

Consumer_data.csv  sample_data		    spark-3.3.2-bin-hadoop3.tgz
dataset_nm.csv	   spark-3.3.2-bin-hadoop3  spark-3.3.2-bin-hadoop3.tgz.1


In [25]:
consumerDF = spark.read.csv("Consumer_data.csv",header=True)

In [26]:
consumerDF.show()

+----------+-----------+------+------+-------+
|CustomerID|        Age|Salary|Gender|Country|
+----------+-----------+------+------+-------+
|         1|         18| 20000|  Male|Germany|
|         2|         19| 22000|Female|Unknown|
|         3|       null| 24000|Female|England|
|         4|         21|  2600|  Male|England|
|         5|         22| 50000|  Male| France|
|         6|       null| 35000|Female|Unknown|
|         7|         24|  4300|  Male|Germany|
|         8|         25|  null|Female| France|
|         9|         35| 35000|  Male|Germany|
|        10|         27| 37000|Female| France|
|        11|         31| 25000|  Male|Germany|
|        12|32.38181818| 27000|Female|Unknown|
|        13|33.76363636| 29000|Female|England|
|        14|35.14545455|  7600|  Male|England|
|        15|36.52727273|  null|  Male| France|
|        16|37.90909091| 40000|Female|England|
|        17|39.29090909|  9300|  Male|Germany|
|        18|40.67272727| 37000|Female| France|
|        19|4

It can be seen that the data has some countries as 'Unknown' and we have missing values in the Age & Salary columns. Let's handle these problems.

### Replacing rows with 'Unknown' country

In [27]:
consumerDF1 = consumerDF.filter(consumerDF['Country'] != "Unknown")
consumerDF1.show()

+----------+-----------+------+------+-------+
|CustomerID|        Age|Salary|Gender|Country|
+----------+-----------+------+------+-------+
|         1|         18| 20000|  Male|Germany|
|         3|       null| 24000|Female|England|
|         4|         21|  2600|  Male|England|
|         5|         22| 50000|  Male| France|
|         7|         24|  4300|  Male|Germany|
|         8|         25|  null|Female| France|
|         9|         35| 35000|  Male|Germany|
|        10|         27| 37000|Female| France|
|        11|         31| 25000|  Male|Germany|
|        13|33.76363636| 29000|Female|England|
|        14|35.14545455|  7600|  Male|England|
|        15|36.52727273|  null|  Male| France|
|        16|37.90909091| 40000|Female|England|
|        17|39.29090909|  9300|  Male|Germany|
|        18|40.67272727| 37000|Female| France|
|        20|43.43636364| 42000|Female| France|
|        21|44.81818182| 30000|Female| France|
|        22|       46.2| 32000|Female| France|
|        23|4

### Replacing rows having null values with mean of respective columns - 


In [28]:
consumerDF1.printSchema()

root
 |-- CustomerID: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Salary: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Country: string (nullable = true)



First I'll convert Age & Salary to integer & float type respectively.

In [35]:
from pyspark.sql.types import IntegerType,FloatType

consumerDF2 = consumerDF1.withColumn("Age", consumerDF1["Age"].cast(IntegerType())).withColumn("Salary", consumerDF1["Salary"].cast(FloatType()))


In [36]:
consumerDF2.printSchema()

root
 |-- CustomerID: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Salary: float (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Country: string (nullable = true)



In [37]:
from pyspark.sql.functions import mean

mean_age = consumerDF2.select(mean(consumerDF2['Age'])).collect()
mean_age = mean_age[0][0]

mean_age

52.78378378378378

In [38]:
mean_salary = consumerDF2.select(mean(consumerDF2['Salary'])).collect()
mean_salary = mean_salary[0][0]

mean_salary

35094.8347107438

In [40]:
# Original data -
consumerDF2.show()
# (null age and salary values can be seen)

+----------+----+-------+------+-------+
|CustomerID| Age| Salary|Gender|Country|
+----------+----+-------+------+-------+
|         1|  18|20000.0|  Male|Germany|
|         3|null|24000.0|Female|England|
|         4|  21| 2600.0|  Male|England|
|         5|  22|50000.0|  Male| France|
|         7|  24| 4300.0|  Male|Germany|
|         8|  25|   null|Female| France|
|         9|  35|35000.0|  Male|Germany|
|        10|  27|37000.0|Female| France|
|        11|  31|25000.0|  Male|Germany|
|        13|  33|29000.0|Female|England|
|        14|  35| 7600.0|  Male|England|
|        15|  36|   null|  Male| France|
|        16|  37|40000.0|Female|England|
|        17|  39| 9300.0|  Male|Germany|
|        18|  40|37000.0|Female| France|
|        20|  43|42000.0|Female| France|
|        21|  44|30000.0|Female| France|
|        22|  46|32000.0|Female| France|
|        23|  47|34000.0|Female| France|
|        24|  48|12600.0|Female|Germany|
+----------+----+-------+------+-------+
only showing top

In [39]:
consumerDF3 = consumerDF2.na.fill(mean_age,["Age"])
consumerDF3 = consumerDF3.na.fill(mean_salary,["Salary"])
consumerDF3.show()

+----------+---+---------+------+-------+
|CustomerID|Age|   Salary|Gender|Country|
+----------+---+---------+------+-------+
|         1| 18|  20000.0|  Male|Germany|
|         3| 52|  24000.0|Female|England|
|         4| 21|   2600.0|  Male|England|
|         5| 22|  50000.0|  Male| France|
|         7| 24|   4300.0|  Male|Germany|
|         8| 25|35094.836|Female| France|
|         9| 35|  35000.0|  Male|Germany|
|        10| 27|  37000.0|Female| France|
|        11| 31|  25000.0|  Male|Germany|
|        13| 33|  29000.0|Female|England|
|        14| 35|   7600.0|  Male|England|
|        15| 36|35094.836|  Male| France|
|        16| 37|  40000.0|Female|England|
|        17| 39|   9300.0|  Male|Germany|
|        18| 40|  37000.0|Female| France|
|        20| 43|  42000.0|Female| France|
|        21| 44|  30000.0|Female| France|
|        22| 46|  32000.0|Female| France|
|        23| 47|  34000.0|Female| France|
|        24| 48|  12600.0|Female|Germany|
+----------+---+---------+------+-

### Saving the cleaned data 

In [41]:
consumerDF3.write.format("csv").save("consumer_data_cleaned")

In [42]:
# new file has been created
!ls

consumer_data_cleaned  spark-3.3.2-bin-hadoop3
Consumer_data.csv      spark-3.3.2-bin-hadoop3.tgz
dataset_nm.csv	       spark-3.3.2-bin-hadoop3.tgz.1
sample_data


In [43]:
# we can see the part file
!ls consumer_data_cleaned/

part-00000-a7e94f85-cb32-4fc3-a15e-da09a396129b-c000.csv  _SUCCESS


In [44]:
# new file can be easily exported
!cat consumer_data_cleaned/part-00000-a7e94f85-cb32-4fc3-a15e-da09a396129b-c000.csv

1,18,20000.0,Male,Germany
3,52,24000.0,Female,England
4,21,2600.0,Male,England
5,22,50000.0,Male,France
7,24,4300.0,Male,Germany
8,25,35094.836,Female,France
9,35,35000.0,Male,Germany
10,27,37000.0,Female,France
11,31,25000.0,Male,Germany
13,33,29000.0,Female,England
14,35,7600.0,Male,England
15,36,35094.836,Male,France
16,37,40000.0,Female,England
17,39,9300.0,Male,Germany
18,40,37000.0,Female,France
20,43,42000.0,Female,France
21,44,30000.0,Female,France
22,46,32000.0,Female,France
23,47,34000.0,Female,France
24,48,12600.0,Female,Germany
25,50,60000.0,Female,Germany
26,51,45000.0,Female,Germany
27,53,14300.0,Male,Germany
28,54,42000.0,Male,Germany
29,52,45000.0,Male,England
30,57,47000.0,Male,England
31,58,35000.0,Male,England
32,60,37000.0,Male,England
33,61,39000.0,Male,England
34,62,17600.0,Male,England
35,64,65000.0,Male,England
36,65,50000.0,Female,England
37,66,19300.0,Female,England
38,18,29000.0,Male,England
39,19,7600.0,Male,England
40,20,35094.836,Female,Germany
41,21,40000