In [1]:
import pyspark
from delta import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, when, count

In [2]:
builder = SparkSession.builder.master("local").appName("Covid-19").config("spark.jars", "/usr/local/postgresql-42.6.0.jar")\
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [3]:
spark

In [4]:
df = spark.read.option("header", True).option("inferschema", True).csv("hdfs://localhost:9000/covid-19/time-series-19-covid-combined.csv")

In [5]:
df.printSchema() # Here it will the print the schema of the datset and we could see the data types, colmns, etc

root
 |-- Date: string (nullable = true)
 |-- Country/Region: string (nullable = true)
 |-- Province/State: string (nullable = true)
 |-- Confirmed: integer (nullable = true)
 |-- Recovered: integer (nullable = true)
 |-- Deaths: integer (nullable = true)



In [6]:
df.describe().show() #shows basic summary with aggregation

+-------+----------+--------------+--------------+-----------------+------------------+-----------------+
|summary|      Date|Country/Region|Province/State|        Confirmed|         Recovered|           Deaths|
+-------+----------+--------------+--------------+-----------------+------------------+-----------------+
|  count|    231744|        231744|         72624|           231744|            218688|           231744|
|   mean|      null|          null|          null|513236.1723021955|106211.62376993708| 9760.17023094449|
| stddev|      null|          null|          null|3002239.150937862| 839741.3079382312|49749.87151319254|
|    min|01-01-2021|   Afghanistan|       Alberta|                0|                 0|                0|
|    max|31-12-2021|      Zimbabwe|      Zhejiang|         80625120|          30974748|           988609|
+-------+----------+--------------+--------------+-----------------+------------------+-----------------+



In [7]:
df.show(5) #It will show the top 5 rows.
print("The Total Number Of Values are:", df.count())

+----------+--------------+--------------+---------+---------+------+
|      Date|Country/Region|Province/State|Confirmed|Recovered|Deaths|
+----------+--------------+--------------+---------+---------+------+
|22-01-2020|   Afghanistan|          null|        0|        0|     0|
|23-01-2020|   Afghanistan|          null|        0|        0|     0|
|24-01-2020|   Afghanistan|          null|        0|        0|     0|
|25-01-2020|   Afghanistan|          null|        0|        0|     0|
|26-01-2020|   Afghanistan|          null|        0|        0|     0|
+----------+--------------+--------------+---------+---------+------+
only showing top 5 rows

The Total Number Of Values are: 231744


In [8]:
# Ingesting only required columns
df.printSchema() # Most of the columns seems important so we won't be dropping. 

root
 |-- Date: string (nullable = true)
 |-- Country/Region: string (nullable = true)
 |-- Province/State: string (nullable = true)
 |-- Confirmed: integer (nullable = true)
 |-- Recovered: integer (nullable = true)
 |-- Deaths: integer (nullable = true)



In [9]:
# Dropping duplicate values
df_unique = df.dropDuplicates() # Dropped the duplicate values if there are any! 
df_unique.show(10)

+----------+--------------+--------------+---------+---------+------+
|      Date|Country/Region|Province/State|Confirmed|Recovered|Deaths|
+----------+--------------+--------------+---------+---------+------+
|31-05-2020|   Afghanistan|          null|    15180|     1328|   254|
|24-08-2021|   Afghanistan|          null|   152660|        0|  7083|
|21-01-2022|   Afghanistan|          null|   159516|        0|  7390|
|17-08-2020|       Albania|          null|     7499|     3816|   230|
|09-06-2020|       Algeria|          null|    10382|     6951|   724|
|25-03-2021|       Algeria|          null|   116543|    81065|  3071|
|13-04-2021|       Algeria|          null|   118799|    82813|  3137|
|03-02-2020|       Andorra|          null|        0|        0|     0|
|12-10-2020|       Andorra|          null|     2995|     1928|    57|
|23-05-2021|       Andorra|          null|    13569|    13234|   127|
+----------+--------------+--------------+---------+---------+------+
only showing top 10 

In [10]:
# Null Handling
# df_unique.printSchema()
# We are checking in each columns how many "Blanks" are present.
# Since 3rd and 5th columns has null; we will replace according to the datatype of that column
# isnan() is a SQL function that is used to check for NAN values and isNull() is a Column class function that is used to check for Null values.

df_colms = ["Date", "Country/Region", "Province/State", "Confirmed", "Recovered", "Deaths"]
df_replace = df_unique.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_colms]).show()

+----+--------------+--------------+---------+---------+------+
|Date|Country/Region|Province/State|Confirmed|Recovered|Deaths|
+----+--------------+--------------+---------+---------+------+
|   0|             0|        159120|        0|    13056|     0|
+----+--------------+--------------+---------+---------+------+



In [11]:
# 3rd Column: String: NA
df_na = df_unique.na.fill("NA", subset =["Province/State"]) # Replaced Null in the string column
df_na.show(10)

+----------+--------------+--------------+---------+---------+------+
|      Date|Country/Region|Province/State|Confirmed|Recovered|Deaths|
+----------+--------------+--------------+---------+---------+------+
|31-05-2020|   Afghanistan|            NA|    15180|     1328|   254|
|24-08-2021|   Afghanistan|            NA|   152660|        0|  7083|
|21-01-2022|   Afghanistan|            NA|   159516|        0|  7390|
|17-08-2020|       Albania|            NA|     7499|     3816|   230|
|09-06-2020|       Algeria|            NA|    10382|     6951|   724|
|25-03-2021|       Algeria|            NA|   116543|    81065|  3071|
|13-04-2021|       Algeria|            NA|   118799|    82813|  3137|
|03-02-2020|       Andorra|            NA|        0|        0|     0|
|12-10-2020|       Andorra|            NA|     2995|     1928|    57|
|23-05-2021|       Andorra|            NA|    13569|    13234|   127|
+----------+--------------+--------------+---------+---------+------+
only showing top 10 

In [12]:
# 5th Column: Integer: 0
df_int = df_na.na.fill(value = 0, subset =["Recovered"]) # Replaced Null in the integer column
df_int.show(10)

+----------+--------------+--------------+---------+---------+------+
|      Date|Country/Region|Province/State|Confirmed|Recovered|Deaths|
+----------+--------------+--------------+---------+---------+------+
|31-05-2020|   Afghanistan|            NA|    15180|     1328|   254|
|24-08-2021|   Afghanistan|            NA|   152660|        0|  7083|
|21-01-2022|   Afghanistan|            NA|   159516|        0|  7390|
|17-08-2020|       Albania|            NA|     7499|     3816|   230|
|09-06-2020|       Algeria|            NA|    10382|     6951|   724|
|25-03-2021|       Algeria|            NA|   116543|    81065|  3071|
|13-04-2021|       Algeria|            NA|   118799|    82813|  3137|
|03-02-2020|       Andorra|            NA|        0|        0|     0|
|12-10-2020|       Andorra|            NA|     2995|     1928|    57|
|23-05-2021|       Andorra|            NA|    13569|    13234|   127|
+----------+--------------+--------------+---------+---------+------+
only showing top 10 

In [13]:
df_colms = ["Date", "Country/Region", "Province/State", "Confirmed", "Recovered", "Deaths"]
df_replace = df_int.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_colms]).show()
# Here we can confirm that all Null/Blanks have been replaced with according to the datatype of the columns

+----+--------------+--------------+---------+---------+------+
|Date|Country/Region|Province/State|Confirmed|Recovered|Deaths|
+----+--------------+--------------+---------+---------+------+
|   0|             0|             0|        0|        0|     0|
+----+--------------+--------------+---------+---------+------+



In [14]:
# Column Renaming 
df_int.printSchema()
# 2nd and 3rd column need to be renamed as Province/State = State and Country/Region = Country

root
 |-- Date: string (nullable = true)
 |-- Country/Region: string (nullable = true)
 |-- Province/State: string (nullable = false)
 |-- Confirmed: integer (nullable = true)
 |-- Recovered: integer (nullable = true)
 |-- Deaths: integer (nullable = true)



In [15]:
df_rename = df_int.withColumnRenamed("Country/Region", "Country").withColumnRenamed("Province/State", "State")

df_rename.printSchema() # Required columns are renamed

df_rename.show(5)

root
 |-- Date: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- State: string (nullable = false)
 |-- Confirmed: integer (nullable = true)
 |-- Recovered: integer (nullable = true)
 |-- Deaths: integer (nullable = true)

+----------+-----------+-----+---------+---------+------+
|      Date|    Country|State|Confirmed|Recovered|Deaths|
+----------+-----------+-----+---------+---------+------+
|31-05-2020|Afghanistan|   NA|    15180|     1328|   254|
|24-08-2021|Afghanistan|   NA|   152660|        0|  7083|
|21-01-2022|Afghanistan|   NA|   159516|        0|  7390|
|17-08-2020|    Albania|   NA|     7499|     3816|   230|
|09-06-2020|    Algeria|   NA|    10382|     6951|   724|
+----------+-----------+-----+---------+---------+------+
only showing top 5 rows



In [16]:
# We don't need to do the Typecasting of any column

In [17]:
# Aggregate the data
# 1.	How many total cases so far?
# 2.	How many recovered cases so far?
# 3.	How many deaths so far?
# 4.	How many total cases so far countrywise?
# 5.	How many recovered cases so far countrywise?
# 6.	How many deaths so far countrywise?

In [18]:
# 1.	How many total cases so far?
from pyspark.sql.functions import sum, avg, aggregate

df_total = df_rename.select(sum("Confirmed").alias("Confirmed_Cases")).show()

+---------------+
|Confirmed_Cases|
+---------------+
|   118939403514|
+---------------+



In [19]:
# 2.	How many recovered cases so far?
df_recovered = df_rename.select(sum("Recovered").alias("Recovered_Cases")).show()

+---------------+
|Recovered_Cases|
+---------------+
|    23227207579|
+---------------+



In [20]:
# 3.	How many deaths so far?
df_death = df_rename.select(sum("Deaths").alias("Total_Death")).show()

+-----------+
|Total_Death|
+-----------+
| 2261860890|
+-----------+



In [21]:
# 4.	How many total cases so far countrywise?
df_total_country = df_rename.groupBy("Country").agg(sum("Confirmed").alias("Total_Confirmed")).show(5)

+--------+---------------+
| Country|Total_Confirmed|
+--------+---------------+
|    Chad|        2737971|
|Paraguay|      198415316|
|  Russia|     4055403879|
|   Yemen|        3987738|
| Senegal|       30940101|
+--------+---------------+
only showing top 5 rows



In [22]:
# 5.	How many recovered cases so far countrywise?
df_recovered_country = df_rename.groupBy("Country").agg(sum("Recovered").alias("Totta_Recovered")).show(5)

+--------+---------------+
| Country|Totta_Recovered|
+--------+---------------+
|    Chad|        1091049|
|Paraguay|       54844740|
|  Russia|     1128064202|
|   Yemen|         772497|
| Senegal|        9755165|
+--------+---------------+
only showing top 5 rows



In [26]:
# 6.	How many deaths so far countrywise?
df_death_country = df_rename.groupBy("Country").agg(sum("Deaths").alias("Total_Death")).show(5)

+--------+-----------+
| Country|Total_Death|
+--------+-----------+
|    Chad|      99316|
|Paraguay|    5974097|
|  Russia|   97041399|
|   Yemen|     810844|
| Senegal|     746432|
+--------+-----------+
only showing top 5 rows



In [28]:
# df_rename.write.format("delta").mode("overwrite").save("/tmp/delta-table")

In [38]:
# Writing the data to PostgresSQL 
df_rename.write \
  .format("jdbc") \
  .format("delta") \
  .option("url", "jdbc:postgresql://localhost:5432/covid") \
  .option("driver", "org.postgresql.Driver") \
  .option("dbtable", "coviddata") \
  .option("user", "postgres") \
  .option("password", "root@1234")\
  .mode('overwrite')\
  .save("/tmp/final-table-delta")