# PySpark Dataframe Complete Guide (with COVID-19 Dataset)

While once upon a time Spark used to be heavily reliant on RDD manipulations, Spark has now provided a DataFrame API for us to work with. you can get dataset from [Here] : https://www.kaggle.com/datasets/sudalairajkumar/covid19-in-india

import findspark
findspark.init()

In [9]:
pip show findspark

Name: findspark
Version: 2.0.1
Summary: Find pyspark to make it importable.
Home-page: https://github.com/minrk/findspark
Author: Min RK
Author-email: benjaminrk@gmail.com
License: BSD (3-clause)
Location: c:\users\anerisonani\appdata\local\programs\python\python310\lib\site-packages
Requires: 
Required-by: 
Note: you may need to restart the kernel to use updated packages.


In [10]:
import pandas as pd
import numpy as np
from datetime import date, timedelta, datetime
import time

import pyspark # only run this after findspark.init()
from pyspark.sql import SparkSession, SQLContext
from pyspark.context import SparkContext
from pyspark.sql.functions import * 
from pyspark.sql.types import * 

In [11]:
spark = SparkSession.builder.appName('covid-example').getOrCreate()

spark

# Data

We will be working with the Data Science for COVID-19 in India, which is one of the most detailed datasets on the internet for COVID.

Data can be found in this kaggle URL Li : nk

## 1. Basics Functions

### 1) Load the data
#### And Create a dataframe named cases

In [12]:
cases = spark.read.load("covid_19_india.csv",
                        format="csv", 
                        sep=",", 
                        inferSchema="true", 
                        header="true")

In [13]:
# pip install pandas

In [14]:
cases.show(10)

+---+-------------------+-------+--------------------+-----------------------+------------------------+-----+------+---------+
|Sno|               Date|   Time|State/UnionTerritory|ConfirmedIndianNational|ConfirmedForeignNational|Cured|Deaths|Confirmed|
+---+-------------------+-------+--------------------+-----------------------+------------------------+-----+------+---------+
|  1|2020-01-30 00:00:00|6:00 PM|              Kerala|                      1|                       0|    0|     0|        1|
|  2|2020-01-31 00:00:00|6:00 PM|              Kerala|                      1|                       0|    0|     0|        1|
|  3|2020-02-01 00:00:00|6:00 PM|              Kerala|                      2|                       0|    0|     0|        2|
|  4|2020-02-02 00:00:00|6:00 PM|              Kerala|                      3|                       0|    0|     0|        3|
|  5|2020-02-03 00:00:00|6:00 PM|              Kerala|                      3|                       0|    0|  

#### 2) Create a global Temporary view for SQL

In [15]:
cases.createTempView("covid_df")

In [16]:
df2 = spark.sql("SELECT * FROM covid_df where `State/UnionTerritory` = 'Gujarat' ")

In [17]:
df2.show()

+---+-------------------+--------+--------------------+-----------------------+------------------------+-----+------+---------+
|Sno|               Date|    Time|State/UnionTerritory|ConfirmedIndianNational|ConfirmedForeignNational|Cured|Deaths|Confirmed|
+---+-------------------+--------+--------------------+-----------------------+------------------------+-----+------+---------+
|232|2020-03-20 00:00:00| 6:00 PM|             Gujarat|                      5|                       0|    0|     0|        5|
|252|2020-03-21 00:00:00| 6:00 PM|             Gujarat|                      7|                       0|    0|     0|        7|
|275|2020-03-22 00:00:00| 6:00 PM|             Gujarat|                     18|                       0|    0|     1|       18|
|298|2020-03-23 00:00:00| 6:00 PM|             Gujarat|                     29|                       0|    0|     1|       29|
|321|2020-03-24 00:00:00| 6:00 PM|             Gujarat|                     32|                       1|

In [18]:
# pip install pandas==1.5.3

 It looks ok right now, but sometimes as we the number of columns increases, the formatting becomes not too great. The.toPandas() function converts a Spark Dataframe into a Pandas Dataframe, which is much easier to play with.

In [19]:
df2.limit(10).toPandas()

  series = series.astype(t, copy=False)


Unnamed: 0,Sno,Date,Time,State/UnionTerritory,ConfirmedIndianNational,ConfirmedForeignNational,Cured,Deaths,Confirmed
0,232,2020-03-20,6:00 PM,Gujarat,5,0,0,0,5
1,252,2020-03-21,6:00 PM,Gujarat,7,0,0,0,7
2,275,2020-03-22,6:00 PM,Gujarat,18,0,0,1,18
3,298,2020-03-23,6:00 PM,Gujarat,29,0,0,1,29
4,321,2020-03-24,6:00 PM,Gujarat,32,1,0,1,33
5,345,2020-03-25,6:00 PM,Gujarat,37,1,0,1,38
6,373,2020-03-26,6:00 PM,Gujarat,42,1,0,3,43
7,400,2020-03-27,10:00 AM,Gujarat,42,1,0,3,43
8,427,2020-03-28,6:00 PM,Gujarat,44,1,0,3,45
9,454,2020-03-29,7:30 PM,Gujarat,-,-,1,5,58


In [20]:
cases.printSchema()

root
 |-- Sno: integer (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Time: string (nullable = true)
 |-- State/UnionTerritory: string (nullable = true)
 |-- ConfirmedIndianNational: string (nullable = true)
 |-- ConfirmedForeignNational: string (nullable = true)
 |-- Cured: integer (nullable = true)
 |-- Deaths: integer (nullable = true)
 |-- Confirmed: integer (nullable = true)



#### 3) To change a single column,

In [21]:
cases = cases.withColumnRenamed("State/UnionTerritory", "state")
df2 = df2.withColumnRenamed("ConfirmedIndianNational", "ConfirmedCases")

#### We can select a subset of columns using the select

In [22]:
df2 = df2.select('Date','ConfirmedCases','Cured','Deaths', 'Confirmed')
df2.show()

+-------------------+--------------+-----+------+---------+
|               Date|ConfirmedCases|Cured|Deaths|Confirmed|
+-------------------+--------------+-----+------+---------+
|2020-03-20 00:00:00|             5|    0|     0|        5|
|2020-03-21 00:00:00|             7|    0|     0|        7|
|2020-03-22 00:00:00|            18|    0|     1|       18|
|2020-03-23 00:00:00|            29|    0|     1|       29|
|2020-03-24 00:00:00|            32|    0|     1|       33|
|2020-03-25 00:00:00|            37|    0|     1|       38|
|2020-03-26 00:00:00|            42|    0|     3|       43|
|2020-03-27 00:00:00|            42|    0|     3|       43|
|2020-03-28 00:00:00|            44|    0|     3|       45|
|2020-03-29 00:00:00|             -|    1|     5|       58|
|2020-03-30 00:00:00|             -|    1|     6|       69|
|2020-03-31 00:00:00|             -|    3|     6|       73|
|2020-04-01 00:00:00|             -|    5|     6|       82|
|2020-04-02 00:00:00|             -|    

#### 4) Sort by Column

In [23]:
# Simple sort
df2.sort("Confirmed").show()

+-------------------+--------------+-----+------+---------+
|               Date|ConfirmedCases|Cured|Deaths|Confirmed|
+-------------------+--------------+-----+------+---------+
|2020-03-20 00:00:00|             5|    0|     0|        5|
|2020-03-21 00:00:00|             7|    0|     0|        7|
|2020-03-22 00:00:00|            18|    0|     1|       18|
|2020-03-23 00:00:00|            29|    0|     1|       29|
|2020-03-24 00:00:00|            32|    0|     1|       33|
|2020-03-25 00:00:00|            37|    0|     1|       38|
|2020-03-26 00:00:00|            42|    0|     3|       43|
|2020-03-27 00:00:00|            42|    0|     3|       43|
|2020-03-28 00:00:00|            44|    0|     3|       45|
|2020-03-29 00:00:00|             -|    1|     5|       58|
|2020-03-30 00:00:00|             -|    1|     6|       69|
|2020-03-31 00:00:00|             -|    3|     6|       73|
|2020-04-01 00:00:00|             -|    5|     6|       82|
|2020-04-02 00:00:00|             -|    

In [24]:
# Descending Sort
from pyspark.sql import functions as F

df2.sort(F.desc("Confirmed")).show()

+-------------------+--------------+------+------+---------+
|               Date|ConfirmedCases| Cured|Deaths|Confirmed|
+-------------------+--------------+------+------+---------+
|2021-08-11 00:00:00|             -|814802| 10077|   825085|
|2021-08-10 00:00:00|             -|814778| 10077|   825064|
|2021-08-09 00:00:00|             -|814761| 10077|   825045|
|2021-08-08 00:00:00|             -|814747| 10077|   825020|
|2021-08-07 00:00:00|             -|814720| 10077|   825001|
|2021-08-06 00:00:00|             -|814696| 10076|   824978|
|2021-08-05 00:00:00|             -|814665| 10076|   824954|
|2021-08-04 00:00:00|             -|814637| 10076|   824939|
|2021-08-03 00:00:00|             -|814595| 10076|   824922|
|2021-08-02 00:00:00|             -|814570| 10076|   824900|
|2021-08-01 00:00:00|             -|814549| 10076|   824877|
|2021-07-31 00:00:00|             -|814514| 10076|   824850|
|2021-07-30 00:00:00|             -|814485| 10076|   824829|
|2021-07-29 00:00:00|   

#### 5) Change Column Type

In [25]:
import datetime
from pyspark.sql.types import DoubleType, IntegerType, StringType

df2 = df2.withColumn('Date', F.col('Date').cast(TimestampType()))
df2.show()

+-------------------+--------------+-----+------+---------+
|               Date|ConfirmedCases|Cured|Deaths|Confirmed|
+-------------------+--------------+-----+------+---------+
|2020-03-20 00:00:00|             5|    0|     0|        5|
|2020-03-21 00:00:00|             7|    0|     0|        7|
|2020-03-22 00:00:00|            18|    0|     1|       18|
|2020-03-23 00:00:00|            29|    0|     1|       29|
|2020-03-24 00:00:00|            32|    0|     1|       33|
|2020-03-25 00:00:00|            37|    0|     1|       38|
|2020-03-26 00:00:00|            42|    0|     3|       43|
|2020-03-27 00:00:00|            42|    0|     3|       43|
|2020-03-28 00:00:00|            44|    0|     3|       45|
|2020-03-29 00:00:00|             -|    1|     5|       58|
|2020-03-30 00:00:00|             -|    1|     6|       69|
|2020-03-31 00:00:00|             -|    3|     6|       73|
|2020-04-01 00:00:00|             -|    5|     6|       82|
|2020-04-02 00:00:00|             -|    

#### 6) Filter

We can filter a data frame using multiple conditions using AND(&), OR(|) and NOT(~) conditions. For example, we may want to find out all the different infection_case in Sikkim with more than 10 confirmed cases.

In [26]:
cases.filter((cases.Confirmed>10) & (cases.state=='Sikkim')).show()

+----+-------------------+-------+------+-----------------------+------------------------+-----+------+---------+
| Sno|               Date|   Time| state|ConfirmedIndianNational|ConfirmedForeignNational|Cured|Deaths|Confirmed|
+----+-------------------+-------+------+-----------------------+------------------------+-----+------+---------+
|2875|2020-06-10 00:00:00|8:00 AM|Sikkim|                      -|                       -|    0|     0|       13|
|2912|2020-06-11 00:00:00|8:00 AM|Sikkim|                      -|                       -|    0|     0|       13|
|2948|2020-06-12 00:00:00|8:00 AM|Sikkim|                      -|                       -|    2|     0|       14|
|2984|2020-06-13 00:00:00|8:00 AM|Sikkim|                      -|                       -|    2|     0|       63|
|3020|2020-06-14 00:00:00|8:00 AM|Sikkim|                      -|                       -|    4|     0|       63|
|3056|2020-06-15 00:00:00|8:00 AM|Sikkim|                      -|                       

#### 7) GroupBy

In [27]:
group = cases.groupBy(["State", "Confirmed"]).agg(F.sum("Confirmed"))
group.show(35)

+--------------------+---------+--------------+
|               State|Confirmed|sum(Confirmed)|
+--------------------+---------+--------------+
|           Jharkhand|       27|            27|
|             Haryana|      225|           225|
|      Andhra Pradesh|      722|           722|
|          Chandigarh|       30|            60|
|           Jharkhand|       67|            67|
|           Karnataka|      705|           705|
|         West Bengal|     2063|          2063|
|              Punjab|     1980|          1980|
|         Uttarakhand|      317|           317|
|             Tripura|      800|           800|
|             Manipur|      490|           490|
|                 Goa|     1482|          2964|
|           Karnataka|    44077|         44077|
|           Jharkhand|    11686|         11686|
|           Rajasthan|    58692|         58692|
|        Chhattisgarh|    14987|         14987|
|           Telengana|    92255|         92255|
|              Sikkim|     1232|        

#### 8)  Use SQL with DataFrams

We first register the cases dataframe to a temporary table cases_table on which we can run SQL operations. As you can see, the result of the SQL select statement is again a Spark Dataframe.

All complex SQL queries like GROUP BY, HAVING, AND ORDER BY clauses can be applied in 'Sql' function

# cases_for_sql = sqlCtx.read.csv("covid_19_india.csv")
cases.registerTempTable('cases_table')
newDF = spark.sql('select * from cases_table where confirmed > 100')
newDF.show()

#### 9) Using Spark UDFs

Sometimes we want to do complicated things to a column or multiple columns. This could be thought of as a map operation on a PySpark Dataframe to a single column or multiple columns. While Spark SQL functions do solve many use cases when it comes to column creation, I use Spark UDF whenever I need more matured Python functionality. \

To use Spark UDFs, we need to use the F.udf function to convert a regular python function to a Spark UDF. We also need to specify the return type of the function. In this example the return type is StringType()

In [34]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

def caesesHighLow(confirmed):
    if confirmed < 50:
        return 'low'
    else:
        return 'high'

# convert to a UDF Function by passing in the function and return type of function

casesHighLowUDF = F.udf(caesesHighLow, StringType())
CasesWithHighLow = cases.withColumn("HighLow", casesHighLowUDF("Confirmed"))
CasesWithHighLow.show()



+---+-------------------+-------+------+-----------------------+------------------------+-----+------+---------+-------+
|Sno|               Date|   Time| state|ConfirmedIndianNational|ConfirmedForeignNational|Cured|Deaths|Confirmed|HighLow|
+---+-------------------+-------+------+-----------------------+------------------------+-----+------+---------+-------+
|  1|2020-01-30 00:00:00|6:00 PM|Kerala|                      1|                       0|    0|     0|        1|    low|
|  2|2020-01-31 00:00:00|6:00 PM|Kerala|                      1|                       0|    0|     0|        1|    low|
|  3|2020-02-01 00:00:00|6:00 PM|Kerala|                      2|                       0|    0|     0|        2|    low|
|  4|2020-02-02 00:00:00|6:00 PM|Kerala|                      3|                       0|    0|     0|        3|    low|
|  5|2020-02-03 00:00:00|6:00 PM|Kerala|                      3|                       0|    0|     0|        3|    low|
|  6|2020-02-04 00:00:00|6:00 PM