In [1]:
import os
os.environ["PYSPARK_PYTHON"] = "python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python"


In [2]:
# printing the covid_dataset with top 20 rows
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Practice').getOrCreate()
df_pyspark = spark.read.csv('covid_data.csv',header=True,inferSchema=True)
df_pyspark.show()


+--------------------+-------------------+---------+----------+----------+---------+------+---------+------+--------------------+
|      Province/State|     Country/Region|      Lat|      Long|      Date|Confirmed|Deaths|Recovered|Active|          WHO Region|
+--------------------+-------------------+---------+----------+----------+---------+------+---------+------+--------------------+
|                NULL|        Afghanistan| 33.93911| 67.709953|2020-01-22|        0|     0|        0|     0|Eastern Mediterra...|
|                NULL|            Albania|  41.1533|   20.1683|2020-01-22|        0|     0|        0|     0|              Europe|
|                NULL|            Algeria|  28.0339|    1.6596|2020-01-22|        0|     0|        0|     0|              Africa|
|                NULL|            Andorra|  42.5063|    1.5218|2020-01-22|        0|     0|        0|     0|              Europe|
|                NULL|             Angola| -11.2027|   17.8739|2020-01-22|        0|     0

In [3]:
#To see the datatype of each column in the dataset
df_pyspark.printSchema()

root
 |-- Province/State: string (nullable = true)
 |-- Country/Region: string (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Long: double (nullable = true)
 |-- Date: date (nullable = true)
 |-- Confirmed: integer (nullable = true)
 |-- Deaths: integer (nullable = true)
 |-- Recovered: integer (nullable = true)
 |-- Active: integer (nullable = true)
 |-- WHO Region: string (nullable = true)



In [4]:
#reading the dataset with inferSchema() - This will convert into required datatype
pyspark_df = spark.read.csv('covid_data.csv',header=True,inferSchema=True)
df_pyspark.printSchema()

root
 |-- Province/State: string (nullable = true)
 |-- Country/Region: string (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Long: double (nullable = true)
 |-- Date: date (nullable = true)
 |-- Confirmed: integer (nullable = true)
 |-- Deaths: integer (nullable = true)
 |-- Recovered: integer (nullable = true)
 |-- Active: integer (nullable = true)
 |-- WHO Region: string (nullable = true)



In [5]:
# it will print the datatype o the schema
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [6]:
#to print specific columns of the dataset
df_pyspark.select('WHO Region').show()

+--------------------+
|          WHO Region|
+--------------------+
|Eastern Mediterra...|
|              Europe|
|              Africa|
|              Europe|
|              Africa|
|            Americas|
|            Americas|
|              Europe|
|     Western Pacific|
|     Western Pacific|
|     Western Pacific|
|     Western Pacific|
|     Western Pacific|
|     Western Pacific|
|     Western Pacific|
|     Western Pacific|
|              Europe|
|              Europe|
|            Americas|
|Eastern Mediterra...|
+--------------------+
only showing top 20 rows



In [7]:
#to print the column names
df_pyspark.columns

['Province/State',
 'Country/Region',
 'Lat',
 'Long',
 'Date',
 'Confirmed',
 'Deaths',
 'Recovered',
 'Active',
 'WHO Region']

In [8]:
#to print the top 3 rows of the dataset
df_pyspark.head(3)

[Row(Province/State=None, Country/Region='Afghanistan', Lat=33.93911, Long=67.709953, Date=datetime.date(2020, 1, 22), Confirmed=0, Deaths=0, Recovered=0, Active=0, WHO Region='Eastern Mediterranean'),
 Row(Province/State=None, Country/Region='Albania', Lat=41.1533, Long=20.1683, Date=datetime.date(2020, 1, 22), Confirmed=0, Deaths=0, Recovered=0, Active=0, WHO Region='Europe'),
 Row(Province/State=None, Country/Region='Algeria', Lat=28.0339, Long=1.6596, Date=datetime.date(2020, 1, 22), Confirmed=0, Deaths=0, Recovered=0, Active=0, WHO Region='Africa')]

In [9]:
#to print the columns and their data types
df_pyspark.dtypes

[('Province/State', 'string'),
 ('Country/Region', 'string'),
 ('Lat', 'double'),
 ('Long', 'double'),
 ('Date', 'date'),
 ('Confirmed', 'int'),
 ('Deaths', 'int'),
 ('Recovered', 'int'),
 ('Active', 'int'),
 ('WHO Region', 'string')]

In [10]:
#to print the summary of the dataset
df_pyspark.describe().show()

+-------+--------------+--------------+------------------+------------------+------------------+-----------------+-----------------+-----------------+---------------+
|summary|Province/State|Country/Region|               Lat|              Long|         Confirmed|           Deaths|        Recovered|           Active|     WHO Region|
+-------+--------------+--------------+------------------+------------------+------------------+-----------------+-----------------+-----------------+---------------+
|  count|         14664|         49068|             49068|             49068|             49068|            49068|            49068|            49068|          49068|
|   mean|          NULL|          NULL|21.433730459769688|23.528236452106245| 16884.90425531915|884.1791595337083|7915.713479253282| 8085.01161653216|           NULL|
| stddev|          NULL|          NULL| 24.95031982606509|  70.4427397445028|127300.20527228026|6313.584410596534|54800.91873054013|76258.90302550694|           NULL

In [11]:
#Adding columns in the dataframe
df_pyspark.withColumn('Confirmed Cases after 2 days',df_pyspark['Confirmed']+2).show()

+--------------------+-------------------+---------+----------+----------+---------+------+---------+------+--------------------+----------------------------+
|      Province/State|     Country/Region|      Lat|      Long|      Date|Confirmed|Deaths|Recovered|Active|          WHO Region|Confirmed Cases after 2 days|
+--------------------+-------------------+---------+----------+----------+---------+------+---------+------+--------------------+----------------------------+
|                NULL|        Afghanistan| 33.93911| 67.709953|2020-01-22|        0|     0|        0|     0|Eastern Mediterra...|                           2|
|                NULL|            Albania|  41.1533|   20.1683|2020-01-22|        0|     0|        0|     0|              Europe|                           2|
|                NULL|            Algeria|  28.0339|    1.6596|2020-01-22|        0|     0|        0|     0|              Africa|                           2|
|                NULL|            Andorra|  42

In [12]:
df_pyspark.withColumnRenamed('Country/Region','Country').show()

+--------------------+-------------------+---------+----------+----------+---------+------+---------+------+--------------------+
|      Province/State|            Country|      Lat|      Long|      Date|Confirmed|Deaths|Recovered|Active|          WHO Region|
+--------------------+-------------------+---------+----------+----------+---------+------+---------+------+--------------------+
|                NULL|        Afghanistan| 33.93911| 67.709953|2020-01-22|        0|     0|        0|     0|Eastern Mediterra...|
|                NULL|            Albania|  41.1533|   20.1683|2020-01-22|        0|     0|        0|     0|              Europe|
|                NULL|            Algeria|  28.0339|    1.6596|2020-01-22|        0|     0|        0|     0|              Africa|
|                NULL|            Andorra|  42.5063|    1.5218|2020-01-22|        0|     0|        0|     0|              Europe|
|                NULL|             Angola| -11.2027|   17.8739|2020-01-22|        0|     0

In [13]:
df = df_pyspark.na.drop(how='any',thresh=10)
df.show()

+--------------------+--------------+------------------+---------+----------+---------+------+---------+------+---------------+
|      Province/State|Country/Region|               Lat|     Long|      Date|Confirmed|Deaths|Recovered|Active|     WHO Region|
+--------------------+--------------+------------------+---------+----------+---------+------+---------+------+---------------+
|Australian Capita...|     Australia|          -35.4735| 149.0124|2020-01-22|        0|     0|        0|     0|Western Pacific|
|     New South Wales|     Australia|          -33.8688| 151.2093|2020-01-22|        0|     0|        0|     0|Western Pacific|
|  Northern Territory|     Australia|          -12.4634| 130.8456|2020-01-22|        0|     0|        0|     0|Western Pacific|
|          Queensland|     Australia|          -27.4698| 153.0251|2020-01-22|        0|     0|        0|     0|Western Pacific|
|     South Australia|     Australia|          -34.9285| 138.6007|2020-01-22|        0|     0|        0|

In [14]:
# if there is a null value in that particular column then whole record will be deleted
df_pyspark.na.drop(how='any',subset=['Province/State']).show()

+--------------------+--------------+------------------+---------+----------+---------+------+---------+------+---------------+
|      Province/State|Country/Region|               Lat|     Long|      Date|Confirmed|Deaths|Recovered|Active|     WHO Region|
+--------------------+--------------+------------------+---------+----------+---------+------+---------+------+---------------+
|Australian Capita...|     Australia|          -35.4735| 149.0124|2020-01-22|        0|     0|        0|     0|Western Pacific|
|     New South Wales|     Australia|          -33.8688| 151.2093|2020-01-22|        0|     0|        0|     0|Western Pacific|
|  Northern Territory|     Australia|          -12.4634| 130.8456|2020-01-22|        0|     0|        0|     0|Western Pacific|
|          Queensland|     Australia|          -27.4698| 153.0251|2020-01-22|        0|     0|        0|     0|Western Pacific|
|     South Australia|     Australia|          -34.9285| 138.6007|2020-01-22|        0|     0|        0|

In [15]:
#filling the missing values
#where specific columns having null values will be filled with 0
df_pyspark.na.fill('0','Province/State').show()

+--------------------+-------------------+---------+----------+----------+---------+------+---------+------+--------------------+
|      Province/State|     Country/Region|      Lat|      Long|      Date|Confirmed|Deaths|Recovered|Active|          WHO Region|
+--------------------+-------------------+---------+----------+----------+---------+------+---------+------+--------------------+
|                   0|        Afghanistan| 33.93911| 67.709953|2020-01-22|        0|     0|        0|     0|Eastern Mediterra...|
|                   0|            Albania|  41.1533|   20.1683|2020-01-22|        0|     0|        0|     0|              Europe|
|                   0|            Algeria|  28.0339|    1.6596|2020-01-22|        0|     0|        0|     0|              Africa|
|                   0|            Andorra|  42.5063|    1.5218|2020-01-22|        0|     0|        0|     0|              Europe|
|                   0|             Angola| -11.2027|   17.8739|2020-01-22|        0|     0

In [16]:
df_pyspark.printSchema()

root
 |-- Province/State: string (nullable = true)
 |-- Country/Region: string (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Long: double (nullable = true)
 |-- Date: date (nullable = true)
 |-- Confirmed: integer (nullable = true)
 |-- Deaths: integer (nullable = true)
 |-- Recovered: integer (nullable = true)
 |-- Active: integer (nullable = true)
 |-- WHO Region: string (nullable = true)



In [17]:
#to convert the columns into desired datatypes
from pyspark.sql.functions import col

df_pyspark = df_pyspark.withColumn("confirmed", col("confirmed").cast("double")) \
                       .withColumn("deaths", col("deaths").cast("double")) \
                       .withColumn("recovered", col("recovered").cast("double"))


In [18]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['confirmed', 'deaths', 'recovered'],
    outputCols=["{} imputed".format(c) for c in ['confirmed', 'deaths', 'recovered']]
).setStrategy('mean')

imputer.fit(df_pyspark).transform(df_pyspark).show()


+--------------------+-------------------+---------+----------+----------+---------+------+---------+------+--------------------+-----------------+--------------+-----------------+
|      Province/State|     Country/Region|      Lat|      Long|      Date|confirmed|deaths|recovered|Active|          WHO Region|confirmed imputed|deaths imputed|recovered imputed|
+--------------------+-------------------+---------+----------+----------+---------+------+---------+------+--------------------+-----------------+--------------+-----------------+
|                NULL|        Afghanistan| 33.93911| 67.709953|2020-01-22|      0.0|   0.0|      0.0|     0|Eastern Mediterra...|              0.0|           0.0|              0.0|
|                NULL|            Albania|  41.1533|   20.1683|2020-01-22|      0.0|   0.0|      0.0|     0|              Europe|              0.0|           0.0|              0.0|
|                NULL|            Algeria|  28.0339|    1.6596|2020-01-22|      0.0|   0.0|    

In [19]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('dataframe').getOrCreate()
df = spark.read.csv('country_wise.csv',header=True,inferSchema=True)
df.show()

+-------------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|     Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|          WHO Region|
+-------------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|        Afghanistan|    36263|  1269|    25198|  9796|      106|        10|           18|               3.5|                69.49|                  5.04|              35526|          737|             2.07|Eastern Mediterra...|
|            Albania|     4880|   144|     2745|  1991|      117|         6|           6

In [20]:
#Filter Operations
#Countries where confirmed cases are greater than or equal to 1000000
df.filter("Confirmed>=1000000").show()

+--------------+---------+------+---------+-------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+---------------+
|Country/Region|Confirmed|Deaths|Recovered| Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|     WHO Region|
+--------------+---------+------+---------+-------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+---------------+
|        Brazil|  2442375| 87618|  1846641| 508116|    23284|       614|        33728|              3.59|                75.61|                  4.74|            2118646|       323729|            15.28|       Americas|
|         India|  1480073| 33408|   951166| 495499|    44457|       637|        33598|              2.26|                64.

In [21]:
#To select specific rows after applying the filter like in this case we selected Country,Deaths,Confirmed,New Recovered
df.filter("Confirmed>=1000000").select(['Country/Region','Deaths','Confirmed','New Recovered']).show()

+--------------+------+---------+-------------+
|Country/Region|Deaths|Confirmed|New Recovered|
+--------------+------+---------+-------------+
|        Brazil| 87618|  2442375|        33728|
|         India| 33408|  1480073|        33598|
|            US|148011|  4290259|        27941|
+--------------+------+---------+-------------+



In [22]:
#applying and condition and checking for the rows
df.filter((df["New recovered"]>=30000) & (df["New recovered"]<=50000)).show()

+--------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+---------------+
|Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|     WHO Region|
+--------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+---------------+
|        Brazil|  2442375| 87618|  1846641|508116|    23284|       614|        33728|              3.59|                75.61|                  4.74|            2118646|       323729|            15.28|       Americas|
|         India|  1480073| 33408|   951166|495499|    44457|       637|        33598|              2.26|                64.26|  

In [23]:
#This ~ operator is used to negate the condition
df.filter(~(df['Active']<=90000)).show()

+--------------+---------+------+---------+-------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+---------------+
|Country/Region|Confirmed|Deaths|Recovered| Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|     WHO Region|
+--------------+---------+------+---------+-------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+---------------+
|     Argentina|   167416|  3059|    72575|  91782|     4890|       120|         2057|              1.83|                43.35|                  4.21|             130774|        36642|            28.02|       Americas|
|    Bangladesh|   226225|  2965|   125683|  97577|     2772|        37|         1801|              1.31|                55.

In [24]:
# Aggregate Functions
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Agg').getOrCreate()
df = spark.read.csv('country_wise.csv',header=True,inferSchema=True)
df.show()


+-------------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|     Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|          WHO Region|
+-------------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|        Afghanistan|    36263|  1269|    25198|  9796|      106|        10|           18|               3.5|                69.49|                  5.04|              35526|          737|             2.07|Eastern Mediterra...|
|            Albania|     4880|   144|     2745|  1991|      117|         6|           6

In [25]:
df.printSchema()

root
 |-- Country/Region: string (nullable = true)
 |-- Confirmed: integer (nullable = true)
 |-- Deaths: integer (nullable = true)
 |-- Recovered: integer (nullable = true)
 |-- Active: integer (nullable = true)
 |-- New cases: integer (nullable = true)
 |-- New deaths: integer (nullable = true)
 |-- New recovered: integer (nullable = true)
 |-- Deaths / 100 Cases: double (nullable = true)
 |-- Recovered / 100 Cases: double (nullable = true)
 |-- Deaths / 100 Recovered: string (nullable = true)
 |-- Confirmed last week: integer (nullable = true)
 |-- 1 week change: integer (nullable = true)
 |-- 1 week % increase: double (nullable = true)
 |-- WHO Region: string (nullable = true)



In [26]:
df.groupBy('WHO Region').sum('Active').show()

+--------------------+-----------+
|          WHO Region|sum(Active)|
+--------------------+-----------+
|              Europe|    1094656|
|     Western Pacific|      77409|
|              Africa|     270339|
|Eastern Mediterra...|     251005|
|            Americas|    4027938|
|     South-East Asia|     637015|
+--------------------+-----------+



In [27]:
# GroupBy which maximum value of the column is selected
from pyspark.sql.functions import max
df.agg(max("Active")).show()


+-----------+
|max(Active)|
+-----------+
|    2816444|
+-----------+



In [28]:
#to get the sum of the column Active
df.agg({'Active':'sum'}).show()    

+-----------+
|sum(Active)|
+-----------+
|    6358362|
+-----------+



In [29]:
df.groupBy('WHO Region').min('Active').show()

+--------------------+-----------+
|          WHO Region|min(Active)|
+--------------------+-----------+
|              Europe|          0|
|     Western Pacific|          0|
|              Africa|          1|
|Eastern Mediterra...|         24|
|            Americas|          0|
|     South-East Asia|         13|
+--------------------+-----------+



In [30]:
df.show(5,0)

+--------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+---------------------+
|Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|WHO Region           |
+--------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+---------------------+
|Afghanistan   |36263    |1269  |25198    |9796  |106      |10        |18           |3.5               |69.49                |5.04                  |35526              |737          |2.07             |Eastern Mediterranean|
|Albania       |4880     |144   |2745     |1991  |117      |6         |63           |2.95              |

In [31]:
from pyspark.sql.functions import col
df = df.withColumn("1 week change",col("1 week change")+1000)
df.show(5,0)

+--------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+---------------------+
|Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|WHO Region           |
+--------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+---------------------+
|Afghanistan   |36263    |1269  |25198    |9796  |106      |10        |18           |3.5               |69.49                |5.04                  |35526              |1737         |2.07             |Eastern Mediterranean|
|Albania       |4880     |144   |2745     |1991  |117      |6         |63           |2.95              |

In [32]:
df = df.filter(col("1 week % increase")>50).show()

+----------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+---------------+
|  Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|     WHO Region|
+----------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+---------------+
|         Bahamas|      382|    11|       91|   280|       40|         0|            0|              2.88|                23.82|                 12.09|                174|         1208|           119.54|       Americas|
|          Gambia|      326|     8|       66|   252|       49|         2|            6|              2.45|              

In [33]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Agg').getOrCreate()
df = spark.read.csv("country_wise.csv", header=True, inferSchema=True)
df.show(5)
df.groupBy("WHO Region").agg({"Confirmed":"sum","Active":"max"}).show()

+--------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|          WHO Region|
+--------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|   Afghanistan|    36263|  1269|    25198|  9796|      106|        10|           18|               3.5|                69.49|                  5.04|              35526|          737|             2.07|Eastern Mediterra...|
|       Albania|     4880|   144|     2745|  1991|      117|         6|           63|              2.95|    

In [34]:
# Filter operations
df.filter(df["Country/Region"] == "Albania").show()

+--------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+----------+
|Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|WHO Region|
+--------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+----------+
|       Albania|     4880|   144|     2745|  1991|      117|         6|           63|              2.95|                56.25|                  5.25|               4171|          709|             17.0|    Europe|
+--------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------

In [35]:
from pyspark.sql.functions import when,col 
df = df.withColumn("New cases",when(col("Country/Region")=="Afghanistan", col("New cases") + 1000).otherwise(col("New cases")))
df.show(5,0)

+--------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+---------------------+
|Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|WHO Region           |
+--------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+---------------------+
|Afghanistan   |36263    |1269  |25198    |9796  |1106     |10        |18           |3.5               |69.49                |5.04                  |35526              |737          |2.07             |Eastern Mediterranean|
|Albania       |4880     |144   |2745     |1991  |117      |6         |63           |2.95              |

In [36]:
# inserting new row
from pyspark.sql import Row
row = spark.createDataFrame([Row("India",1106,1269,25198,9796,11,1,18,3.5,69.49,5.04,35526,737,2.07,"Eastern Mediterranean")])
df = df.union(row)

In [37]:
df.printSchema()

root
 |-- Country/Region: string (nullable = true)
 |-- Confirmed: long (nullable = true)
 |-- Deaths: long (nullable = true)
 |-- Recovered: long (nullable = true)
 |-- Active: long (nullable = true)
 |-- New cases: long (nullable = true)
 |-- New deaths: long (nullable = true)
 |-- New recovered: long (nullable = true)
 |-- Deaths / 100 Cases: double (nullable = true)
 |-- Recovered / 100 Cases: double (nullable = true)
 |-- Deaths / 100 Recovered: string (nullable = true)
 |-- Confirmed last week: long (nullable = true)
 |-- 1 week change: long (nullable = true)
 |-- 1 week % increase: double (nullable = true)
 |-- WHO Region: string (nullable = true)



In [38]:
from pyspark.sql.functions import col
df = df.withColumnRenamed("Country/Region", "Country").cache()
df.select("Country").distinct().show()


+--------------+
|       Country|
+--------------+
|          Chad|
|      Paraguay|
|        Russia|
|         Yemen|
|       Senegal|
|    Cabo Verde|
|        Sweden|
|        Guyana|
|         Burma|
|       Eritrea|
|   Philippines|
|      Djibouti|
|      Malaysia|
|     Singapore|
|          Fiji|
|        Turkey|
|        Malawi|
|Western Sahara|
|          Iraq|
|       Germany|
+--------------+
only showing top 20 rows



In [39]:
df.filter(df["Country"] == "India").show()

+-------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|Country|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|          WHO Region|
+-------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|  India|  1480073| 33408|   951166|495499|    44457|       637|        33598|              2.26|                64.26|                  3.51|            1155338|       324735|            28.11|     South-East Asia|
|  India|     1106|  1269|    25198|  9796|       11|         1|           18|               3.5|                69.49|                 

In [42]:
#Delete row by id
df =df.filter(df["Confirmed"] != 1106)
df.filter(df["Country"]=='India').show()

+-------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+---------------+
|Country|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|     WHO Region|
+-------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+---------------+
|  India|  1480073| 33408|   951166|495499|    44457|       637|        33598|              2.26|                64.26|                  3.51|            1155338|       324735|            28.11|South-East Asia|
+-------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+--------------

In [44]:
#groupby and aggregate
from pyspark.sql.functions import avg
df.groupBy("WHO Region").agg(avg("Confirmed").alias("Confirmed_avg_cases")).show()

+--------------------+-------------------+
|          WHO Region|Confirmed_avg_cases|
+--------------------+-------------------+
|              Europe|  58920.05357142857|
|     Western Pacific|           18276.75|
|              Africa|         15066.8125|
|Eastern Mediterra...|  67761.09090909091|
|            Americas|  252551.0285714286|
|     South-East Asia|           183529.7|
+--------------------+-------------------+

