## Data Preprocessing using PySpark

In [1]:
#Kaggle link for the dataset used. This dataset has more than 49,000 records.

#https://www.kaggle.com/imdevskp/corona-virus-report?select=covid_19_clean_complete.csv

In [2]:
import pandas as pd
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Start').getOrCreate()
spark

In [3]:
covid= spark.read.csv(r"C:\Users\mahaj\Downloads\covid_dataset\covid_19.csv", header= True, inferSchema= True)  #read 
#OR
#covid= spark.read.option('header','true').csv(r"C:\Users\mahaj\Downloads\covid_dataset\covid_19.csv",inferSchema=True) 
#header=conside first row as header
#inferschema=convert default string values to proper integers in printSchema

covid.toPandas()
#OR
# covid.show(covid4.count(), False)


Unnamed: 0,Province/State,Country/Region,Lat,Long,Date,Confirmed,Deaths,Recovered,Active,WHO Region
0,,Afghanistan,33.939110,67.709953,2020-01-22,0,0,0,0,Eastern Mediterranean
1,,Albania,41.153300,20.168300,2020-01-22,0,0,0,0,Europe
2,,Algeria,28.033900,1.659600,2020-01-22,0,0,0,0,Africa
3,,Andorra,42.506300,1.521800,2020-01-22,0,0,0,0,Europe
4,,Angola,-11.202700,17.873900,2020-01-22,0,0,0,0,Africa
...,...,...,...,...,...,...,...,...,...,...
49063,,Sao Tome and Principe,0.186400,6.613100,2020-07-27,865,14,734,117,Africa
49064,,Yemen,15.552727,48.516388,2020-07-27,1691,483,833,375,Eastern Mediterranean
49065,,Comoros,-11.645500,43.333300,2020-07-27,354,7,328,19,Africa
49066,,Tajikistan,38.861000,71.276100,2020-07-27,7235,60,6028,1147,Europe


In [4]:
covid.printSchema() #dataset information
type(covid) 

root
 |-- Province/State: string (nullable = true)
 |-- Country/Region: string (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Long: double (nullable = true)
 |-- Date: string (nullable = true)
 |-- Confirmed: integer (nullable = true)
 |-- Deaths: integer (nullable = true)
 |-- Recovered: integer (nullable = true)
 |-- Active: integer (nullable = true)
 |-- WHO Region: string (nullable = true)



pyspark.sql.dataframe.DataFrame

In [5]:
covid.head(1) #select specific rows, number in bracket=number of rows selected

[Row(Province/State=None, Country/Region='Afghanistan', Lat=33.93911, Long=67.709953, Date='2020-01-22', Confirmed=0, Deaths=0, Recovered=0, Active=0, WHO Region='Eastern Mediterranean')]

In [6]:
covid1=covid.select(['Country/Region','Confirmed','WHO Region'])
covid1.show() #select specific features
# display(covid1) #shows type of dataframe

+-------------------+---------+--------------------+
|     Country/Region|Confirmed|          WHO Region|
+-------------------+---------+--------------------+
|        Afghanistan|        0|Eastern Mediterra...|
|            Albania|        0|              Europe|
|            Algeria|        0|              Africa|
|            Andorra|        0|              Europe|
|             Angola|        0|              Africa|
|Antigua and Barbuda|        0|            Americas|
|          Argentina|        0|            Americas|
|            Armenia|        0|              Europe|
|          Australia|        0|     Western Pacific|
|          Australia|        0|     Western Pacific|
|          Australia|        0|     Western Pacific|
|          Australia|        0|     Western Pacific|
|          Australia|        0|     Western Pacific|
|          Australia|        0|     Western Pacific|
|          Australia|        0|     Western Pacific|
|          Australia|        0|     Western Pa

In [7]:
covid2= covid1.withColumn('Confirmed cases new', covid.Confirmed +500)
covid2.show() #add new feature

+-------------------+---------+--------------------+-------------------+
|     Country/Region|Confirmed|          WHO Region|Confirmed cases new|
+-------------------+---------+--------------------+-------------------+
|        Afghanistan|        0|Eastern Mediterra...|                500|
|            Albania|        0|              Europe|                500|
|            Algeria|        0|              Africa|                500|
|            Andorra|        0|              Europe|                500|
|             Angola|        0|              Africa|                500|
|Antigua and Barbuda|        0|            Americas|                500|
|          Argentina|        0|            Americas|                500|
|            Armenia|        0|              Europe|                500|
|          Australia|        0|     Western Pacific|                500|
|          Australia|        0|     Western Pacific|                500|
|          Australia|        0|     Western Pacific

In [8]:
covid3= covid2.drop('Confirmed cases new') #delete any feature
covid3.show(n=500) #n=number of lines

+--------------------+---------+--------------------+
|      Country/Region|Confirmed|          WHO Region|
+--------------------+---------+--------------------+
|         Afghanistan|        0|Eastern Mediterra...|
|             Albania|        0|              Europe|
|             Algeria|        0|              Africa|
|             Andorra|        0|              Europe|
|              Angola|        0|              Africa|
| Antigua and Barbuda|        0|            Americas|
|           Argentina|        0|            Americas|
|             Armenia|        0|              Europe|
|           Australia|        0|     Western Pacific|
|           Australia|        0|     Western Pacific|
|           Australia|        0|     Western Pacific|
|           Australia|        0|     Western Pacific|
|           Australia|        0|     Western Pacific|
|           Australia|        0|     Western Pacific|
|           Australia|        0|     Western Pacific|
|           Australia|      

In [9]:
# covid4=covid.withColumnRenamed('Country/Region','Nation')  #replace one column name with other name #works for one feature
# covid4=covid.withColumnRenamed('Province/State','State') 

#for multiple features
covid4=(covid.withColumnRenamed('Country/Region','Nation')
        .withColumnRenamed('Province/State','State')
        .withColumnRenamed('Deaths','Deceased'))        
covid4.show()


+--------------------+-------------------+---------+----------+----------+---------+--------+---------+------+--------------------+
|               State|             Nation|      Lat|      Long|      Date|Confirmed|Deceased|Recovered|Active|          WHO Region|
+--------------------+-------------------+---------+----------+----------+---------+--------+---------+------+--------------------+
|                null|        Afghanistan| 33.93911| 67.709953|2020-01-22|        0|       0|        0|     0|Eastern Mediterra...|
|                null|            Albania|  41.1533|   20.1683|2020-01-22|        0|       0|        0|     0|              Europe|
|                null|            Algeria|  28.0339|    1.6596|2020-01-22|        0|       0|        0|     0|              Africa|
|                null|            Andorra|  42.5063|    1.5218|2020-01-22|        0|       0|        0|     0|              Europe|
|                null|             Angola| -11.2027|   17.8739|2020-01-22|  

In [10]:
covid4= covid4.na.drop('any', thresh= 3)
covid4= covid4.na.drop('any', subset=['State'])
covid4.show()

#ANY=deletes records with even 1 null present in it
#ALL= deletes records with all nulls
#thresh= deletes records with null values below 3
#subset= deletes records of particular feature if that feature contains nulls

+--------------------+---------+------------------+---------+----------+---------+--------+---------+------+---------------+
|               State|   Nation|               Lat|     Long|      Date|Confirmed|Deceased|Recovered|Active|     WHO Region|
+--------------------+---------+------------------+---------+----------+---------+--------+---------+------+---------------+
|Australian Capita...|Australia|          -35.4735| 149.0124|2020-01-22|        0|       0|        0|     0|Western Pacific|
|     New South Wales|Australia|          -33.8688| 151.2093|2020-01-22|        0|       0|        0|     0|Western Pacific|
|  Northern Territory|Australia|          -12.4634| 130.8456|2020-01-22|        0|       0|        0|     0|Western Pacific|
|          Queensland|Australia|          -27.4698| 153.0251|2020-01-22|        0|       0|        0|     0|Western Pacific|
|     South Australia|Australia|          -34.9285| 138.6007|2020-01-22|        0|       0|        0|     0|Western Pacific|


In [11]:
covid4=covid4.na.fill('Abs',['Confirmed', 'Deceased'])  #replaces null values in 'Confirmed', 'Deceased' with Abs
covid4.show()


+--------------------+---------+------------------+---------+----------+---------+--------+---------+------+---------------+
|               State|   Nation|               Lat|     Long|      Date|Confirmed|Deceased|Recovered|Active|     WHO Region|
+--------------------+---------+------------------+---------+----------+---------+--------+---------+------+---------------+
|Australian Capita...|Australia|          -35.4735| 149.0124|2020-01-22|        0|       0|        0|     0|Western Pacific|
|     New South Wales|Australia|          -33.8688| 151.2093|2020-01-22|        0|       0|        0|     0|Western Pacific|
|  Northern Territory|Australia|          -12.4634| 130.8456|2020-01-22|        0|       0|        0|     0|Western Pacific|
|          Queensland|Australia|          -27.4698| 153.0251|2020-01-22|        0|       0|        0|     0|Western Pacific|
|     South Australia|Australia|          -34.9285| 138.6007|2020-01-22|        0|       0|        0|     0|Western Pacific|


In [12]:
#Imputer function to replace nulls with mean/mediam/mode:

from pyspark.ml.feature import Imputer
imputer= Imputer(
    inputCols=['Confirmed', 'Deceased','Recovered','Active'],
    outputCols=['{}_imputed'.format(c) for c in ['Confirmed', 'Deceased','Recovered','Active']]
).setStrategy('mean')

In [13]:
covid5=imputer.fit(covid4).transform(covid4).show(truncate= False)                    


+----------------------------+---------+------------------+---------+----------+---------+--------+---------+------+---------------+-----------------+----------------+-----------------+--------------+
|State                       |Nation   |Lat               |Long     |Date      |Confirmed|Deceased|Recovered|Active|WHO Region     |Confirmed_imputed|Deceased_imputed|Recovered_imputed|Active_imputed|
+----------------------------+---------+------------------+---------+----------+---------+--------+---------+------+---------------+-----------------+----------------+-----------------+--------------+
|Australian Capital Territory|Australia|-35.4735          |149.0124 |2020-01-22|0        |0       |0        |0     |Western Pacific|0                |0               |0                |0             |
|New South Wales             |Australia|-33.8688          |151.2093 |2020-01-22|0        |0       |0        |0     |Western Pacific|0                |0               |0                |0          

In [14]:
#data retieval with filter operations:
covid4.filter('Active>0').show() #states with atleast 1 active case

+---------+------+------------------+--------+----------+---------+--------+---------+------+---------------+
|    State|Nation|               Lat|    Long|      Date|Confirmed|Deceased|Recovered|Active|     WHO Region|
+---------+------+------------------+--------+----------+---------+--------+---------+------+---------------+
|    Anhui| China|           31.8257|117.2264|2020-01-22|        1|       0|        0|     1|Western Pacific|
|  Beijing| China|           40.1824|116.4142|2020-01-22|       14|       0|        0|    14|Western Pacific|
|Chongqing| China|           30.0572| 107.874|2020-01-22|        6|       0|        0|     6|Western Pacific|
|   Fujian| China|           26.0789|117.9874|2020-01-22|        1|       0|        0|     1|Western Pacific|
|Guangdong| China|           23.3417|113.4244|2020-01-22|       26|       0|        0|    26|Western Pacific|
|  Guangxi| China|           23.8298|108.7881|2020-01-22|        2|       0|        0|     2|Western Pacific|
|  Guizhou

In [15]:
covid4.filter('Active>0').select(['Confirmed','Deceased','Recovered']).show() #states with atleast 1 Confirmed,Deceased,Recovered cases

+---------+--------+---------+
|Confirmed|Deceased|Recovered|
+---------+--------+---------+
|        1|       0|        0|
|       14|       0|        0|
|        6|       0|        0|
|        1|       0|        0|
|       26|       0|        0|
|        2|       0|        0|
|        1|       0|        0|
|        4|       0|        0|
|        1|       0|        0|
|        5|       0|        0|
|      444|      17|       28|
|        4|       0|        0|
|        1|       0|        0|
|        2|       0|        0|
|        2|       0|        0|
|        1|       0|        0|
|        1|       0|        0|
|        2|       0|        0|
|        9|       0|        0|
|        1|       0|        0|
+---------+--------+---------+
only showing top 20 rows



In [16]:
covid4.filter((covid4['Confirmed']>1000) &    #states with confirmed cases more than 1k, dead more than 500 
              (covid4['Deceased']>500) &      #and recoveries more than 2k
              (covid4['Recovered']>2000)).show()

+-----+------+-------+--------+----------+---------+--------+---------+------+---------------+
|State|Nation|    Lat|    Long|      Date|Confirmed|Deceased|Recovered|Active|     WHO Region|
+-----+------+-------+--------+----------+---------+--------+---------+------+---------------+
|Hubei| China|30.9756|112.2707|2020-02-10|    31728|     974|     2222| 28532|Western Pacific|
|Hubei| China|30.9756|112.2707|2020-02-11|    33366|    1068|     2639| 29659|Western Pacific|
|Hubei| China|30.9756|112.2707|2020-02-12|    34874|    1068|     2686| 31120|Western Pacific|
|Hubei| China|30.9756|112.2707|2020-02-13|    48206|    1310|     3459| 43437|Western Pacific|
|Hubei| China|30.9756|112.2707|2020-02-14|    54406|    1457|     4774| 48175|Western Pacific|
|Hubei| China|30.9756|112.2707|2020-02-15|    56249|    1596|     5623| 49030|Western Pacific|
|Hubei| China|30.9756|112.2707|2020-02-16|    58182|    1696|     6639| 49847|Western Pacific|
|Hubei| China|30.9756|112.2707|2020-02-17|    5998

In [17]:
# covid4.toPandas()

In [19]:
# avrg=covid4.groupBy('Nation').avg() #average statistics of individual nations
# avrg.toPandas()

# cnt=covid4.groupBy('Nation').count() #number of appearance of nations in dataset
# cnt.toPandas()

mx=covid4.groupBy('Nation').max()   #nations with max statistics of covid19
mx.toPandas()

#similarly, min, max, sum, mean, agg, pivot can be used

Unnamed: 0,Nation,max(Lat),max(Long),max(Confirmed),max(Deceased),max(Recovered),max(Active)
0,France,46.8852,165.618042,7514,42,6047,3088
1,China,47.862,127.7615,68135,4512,64435,50633
2,Denmark,61.8926,-6.9118,214,0,188,118
3,Canada,64.8255,-57.6604,58728,5667,0,53061
4,Greenland,71.7069,-42.6043,14,0,13,8
5,Australia,-12.4634,153.0251,9049,83,3817,5149
6,United Kingdom,54.2361,-2.3644,584,48,533,394
7,Netherlands,18.0425,-63.0548,119,15,102,69


In [None]:
#There are numerous operations that can be done by PySpark on a dataframe. Here, few of them have been used to demonstrate the capability and efficiency of Spark while handling Big Data.