In [2]:
!pip install pyspark



### PySpark CRUD operation

Create (Insert)

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("COVID crud").getOrCreate()

Load and Read Data

In [4]:
df = spark.read.option("header" , True).csv("D:\projects\BL_DE_Digit Insurance\PostgressSQL\data\country_wise_latest.csv")
df.show()

+-------------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|     Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|          WHO Region|
+-------------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|        Afghanistan|    36263|  1269|    25198|  9796|      106|        10|           18|               3.5|                69.49|                  5.04|              35526|          737|             2.07|Eastern Mediterra...|
|            Albania|     4880|   144|     2745|  1991|      117|         6|           6

Query Data

In [5]:
df.filter("Confirmed > 1000").select(["Country/Region" , "WHO Region" , "Confirmed"]).show()

+--------------------+--------------------+---------+
|      Country/Region|          WHO Region|Confirmed|
+--------------------+--------------------+---------+
|         Afghanistan|Eastern Mediterra...|    36263|
|             Albania|              Europe|     4880|
|             Algeria|              Africa|    27973|
|           Argentina|            Americas|   167416|
|             Armenia|              Europe|    37390|
|           Australia|     Western Pacific|    15303|
|             Austria|              Europe|    20558|
|          Azerbaijan|              Europe|    30446|
|             Bahrain|Eastern Mediterra...|    39482|
|          Bangladesh|     South-East Asia|   226225|
|             Belarus|              Europe|    67251|
|             Belgium|              Europe|    66428|
|               Benin|              Africa|     1770|
|             Bolivia|            Americas|    71181|
|Bosnia and Herzeg...|              Europe|    10498|
|              Brazil|      

Update Data

In [6]:
updated_df = df.withColumn("Deaths" , when(col("Deaths") == 100 , 41).otherwise(col("Deaths")))
updated_df.show()

+-------------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|     Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|          WHO Region|
+-------------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|        Afghanistan|    36263|  1269|    25198|  9796|      106|        10|           18|               3.5|                69.49|                  5.04|              35526|          737|             2.07|Eastern Mediterra...|
|            Albania|     4880|   144|     2745|  1991|      117|         6|           6

Delete Data (Cannot delete but we can aggregate the unwanted data)

In [7]:
df_filtered = df.filter(col("Country/Region") == "Afghanistan").show()

+--------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|          WHO Region|
+--------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|   Afghanistan|    36263|  1269|    25198|  9796|      106|        10|           18|               3.5|                69.49|                  5.04|              35526|          737|             2.07|Eastern Mediterra...|
+--------------+---------+------+---------+------+---------+----------+-------------+------------------+----

Rename Column

In [8]:
df_renamed = df.withColumnRenamed("Country/Region" , "Country_Region").show()

+-------------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|     Country_Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|          WHO Region|
+-------------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|        Afghanistan|    36263|  1269|    25198|  9796|      106|        10|           18|               3.5|                69.49|                  5.04|              35526|          737|             2.07|Eastern Mediterra...|
|            Albania|     4880|   144|     2745|  1991|      117|         6|           6

Drop The Column

In [9]:
df_dropped = df.drop("1 week change").show()

+-------------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-----------------+--------------------+
|     Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week % increase|          WHO Region|
+-------------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-----------------+--------------------+
|        Afghanistan|    36263|  1269|    25198|  9796|      106|        10|           18|               3.5|                69.49|                  5.04|              35526|             2.07|Eastern Mediterra...|
|            Albania|     4880|   144|     2745|  1991|      117|         6|           63|              2.95|                56.25|             

Change the Data Type

In [10]:
df_typecase = df.withColumn("Deaths" , col("Deaths").cast("int"))

In [11]:
df_typecase.printSchema()

root
 |-- Country/Region: string (nullable = true)
 |-- Confirmed: string (nullable = true)
 |-- Deaths: integer (nullable = true)
 |-- Recovered: string (nullable = true)
 |-- Active: string (nullable = true)
 |-- New cases: string (nullable = true)
 |-- New deaths: string (nullable = true)
 |-- New recovered: string (nullable = true)
 |-- Deaths / 100 Cases: string (nullable = true)
 |-- Recovered / 100 Cases: string (nullable = true)
 |-- Deaths / 100 Recovered: string (nullable = true)
 |-- Confirmed last week: string (nullable = true)
 |-- 1 week change: string (nullable = true)
 |-- 1 week % increase: string (nullable = true)
 |-- WHO Region: string (nullable = true)



In [12]:
df.orderBy("Country/Region" , ascending = False).show()

+--------------------+---------+------+---------+-------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|      Country/Region|Confirmed|Deaths|Recovered| Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|          WHO Region|
+--------------------+---------+------+---------+-------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|            Zimbabwe|     2704|    36|      542|   2126|      192|         2|           24|              1.33|                20.04|                  6.64|               1713|          991|            57.85|              Africa|
|              Zambia|     4552|   140|     2815|   1597|       71|         1|  

Groupby

In [13]:
df.groupBy("Deaths").count().show()

+------+-----+
|Deaths|count|
+------+-----+
|  4838|    1|
|     7|    3|
|    51|    2|
|   124|    2|
|   613|    1|
|   711|    1|
|    54|    1|
|    15|    1|
|   483|    1|
|    11|    4|
| 30212|    1|
|   543|    1|
|    69|    2|
|   112|    1|
|    42|    1|
|    87|    1|
|    64|    1|
|     3|    2|
|    34|    1|
|    59|    1|
+------+-----+
only showing top 20 rows



MAX() MIN() SUM() AVG()

In [14]:
df.agg({"Deaths" : "sum"}).show()
df.agg({"Deaths" : "max"}).show()
df.agg({"Deaths" : "min"}).show()
df.agg({"Deaths" : "avg"}).show()

+-----------+
|sum(Deaths)|
+-----------+
|   654036.0|
+-----------+

+-----------+
|max(Deaths)|
+-----------+
|        998|
+-----------+

+-----------+
|min(Deaths)|
+-----------+
|          0|
+-----------+

+----------------+
|     avg(Deaths)|
+----------------+
|3497.51871657754|
+----------------+



In [15]:
df.describe().show()

+-------+--------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+-----------------+------------------+---------------------+----------------------+-------------------+-----------------+------------------+---------------+
|summary|Country/Region|        Confirmed|           Deaths|         Recovered|            Active|        New cases|        New deaths|    New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|    1 week change| 1 week % increase|     WHO Region|
+-------+--------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+-----------------+------------------+---------------------+----------------------+-------------------+-----------------+------------------+---------------+
|  count|           187|              187|              187|               187|               187|              187|           