In [1]:
import findspark 
findspark.init() 
from pyspark.sql import SparkSession  
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DateType
import pandas as pd  
spark = SparkSession \
    .builder \
    .appName("COVID-19 Data Analysis") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

if 'spark' in locals() and isinstance(spark, SparkSession):
    print("SparkSession is active and ready to use.")
else:
    print("SparkSession is not active. Please create a SparkSession.")

25/01/17 21:01:50 WARN Utils: Your hostname, sridhar resolves to a loopback address: 127.0.1.1; using 172.18.174.89 instead (on interface wlp1s0)
25/01/17 21:01:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/17 21:01:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


SparkSession is active and ready to use.


In [2]:
vaccination_data = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/KpHDlIzdtR63BdTofl1mOg/owid-covid-latest.csv')

In [3]:
print("Displaying the first 5 records of the vaccination data:")
columns_to_display = ['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'population']
print(vaccination_data[columns_to_display].head())

Displaying the first 5 records of the vaccination data:
  continent  total_cases  total_deaths  total_vaccinations    population
0      Asia     235214.0        7998.0                 NaN  4.112877e+07
1       NaN   13145380.0      259117.0                 NaN  1.426737e+09
2    Europe     335047.0        3605.0                 NaN  2.842318e+06
3    Africa     272139.0        6881.0                 NaN  4.490323e+07
4   Oceania       8359.0          34.0                 NaN  4.429500e+04


In [4]:
schema = StructType([
    StructField("continent", StringType(), True),
    StructField("total_cases", LongType(), True),
    StructField("total_deaths", LongType(), True),
    StructField("total_vaccinations", LongType(), True),
    StructField("population", LongType(), True)
])

vaccination_data['continent'] = vaccination_data['continent'].astype(str)  # Ensures continent is a string
vaccination_data['total_cases'] = vaccination_data['total_cases'].fillna(0).astype('int64')  # Fill NaNs and convert to int
vaccination_data['total_deaths'] = vaccination_data['total_deaths'].fillna(0).astype('int64')  # Fill NaNs and convert to int
vaccination_data['total_vaccinations'] = vaccination_data['total_vaccinations'].fillna(0).astype('int64')  # Fill NaNs and convert to int
vaccination_data['population'] = vaccination_data['population'].fillna(0).astype('int64')  # Fill NaNs and convert to int

spark_df = spark.createDataFrame(vaccination_data[schema.fieldNames()])  # Use only the specified fields
spark_df.show()

+-------------+-----------+------------+------------------+----------+
|    continent|total_cases|total_deaths|total_vaccinations|population|
+-------------+-----------+------------+------------------+----------+
|         Asia|     235214|        7998|                 0|  41128772|
|          nan|   13145380|      259117|                 0|1426736614|
|       Europe|     335047|        3605|                 0|   2842318|
|       Africa|     272139|        6881|                 0|  44903228|
|      Oceania|       8359|          34|                 0|     44295|
|       Europe|      48015|         159|                 0|     79843|
|       Africa|     107481|        1937|                 0|  35588996|
|North America|       3904|          12|                 0|     15877|
|North America|       9106|         146|                 0|     93772|
|South America|   10101218|      130663|                 0|  45510324|
|         Asia|     452273|        8777|                 0|   2780472|
|North

In [5]:
print("Schema of the Spark DataFrame:")
spark_df.printSchema()

Schema of the Spark DataFrame:
root
 |-- continent: string (nullable = true)
 |-- total_cases: long (nullable = true)
 |-- total_deaths: long (nullable = true)
 |-- total_vaccinations: long (nullable = true)
 |-- population: long (nullable = true)



In [6]:
columns_to_display = ['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'population']
spark_df.select(columns_to_display).show(5)

+---------+-----------+------------+------------------+----------+
|continent|total_cases|total_deaths|total_vaccinations|population|
+---------+-----------+------------+------------------+----------+
|     Asia|     235214|        7998|                 0|  41128772|
|      nan|   13145380|      259117|                 0|1426736614|
|   Europe|     335047|        3605|                 0|   2842318|
|   Africa|     272139|        6881|                 0|  44903228|
|  Oceania|       8359|          34|                 0|     44295|
+---------+-----------+------------+------------------+----------+
only showing top 5 rows



In [7]:
print("Displaying the 'continent' and 'total_cases' columns:")
spark_df.select('continent', 'total_cases').show(5)

Displaying the 'continent' and 'total_cases' columns:
+---------+-----------+
|continent|total_cases|
+---------+-----------+
|     Asia|     235214|
|      nan|   13145380|
|   Europe|     335047|
|   Africa|     272139|
|  Oceania|       8359|
+---------+-----------+
only showing top 5 rows



In [8]:
print("Filtering records where 'total_cases' is greater than 1,000,000:")
spark_df.filter(spark_df['total_cases'] > 1000000).show(5) 

Filtering records where 'total_cases' is greater than 1,000,000:
+-------------+-----------+------------+------------------+----------+
|    continent|total_cases|total_deaths|total_vaccinations|population|
+-------------+-----------+------------+------------------+----------+
|          nan|   13145380|      259117|                 0|1426736614|
|South America|   10101218|      130663|                 0|  45510324|
|          nan|  301499099|     1637249|        9104304615|4721383370|
|      Oceania|   11861161|       25236|                 0|  26177410|
|       Europe|    6082444|       22534|                 0|   8939617|
+-------------+-----------+------------+------------------+----------+
only showing top 5 rows



In [9]:
from pyspark.sql import functions as F

spark_df_with_percentage = spark_df.withColumn(
    'death_percentage', 
    (spark_df['total_deaths'] / spark_df['population']) * 100
)
spark_df_with_percentage = spark_df_with_percentage.withColumn(
    'death_percentage',
    F.concat(
        F.format_number(spark_df_with_percentage['death_percentage'], 2), 
        F.lit('%')  
    )
)
columns_to_display = ['total_deaths', 'population', 'death_percentage', 'continent', 'total_vaccinations', 'total_cases']
spark_df_with_percentage.select(columns_to_display).show(5)

+------------+----------+----------------+---------+------------------+-----------+
|total_deaths|population|death_percentage|continent|total_vaccinations|total_cases|
+------------+----------+----------------+---------+------------------+-----------+
|        7998|  41128772|           0.02%|     Asia|                 0|     235214|
|      259117|1426736614|           0.02%|      nan|                 0|   13145380|
|        3605|   2842318|           0.13%|   Europe|                 0|     335047|
|        6881|  44903228|           0.02%|   Africa|                 0|     272139|
|          34|     44295|           0.08%|  Oceania|                 0|       8359|
+------------+----------+----------------+---------+------------------+-----------+
only showing top 5 rows



In [10]:
print("Calculating the total deaths per continent:")
spark_df.groupby(['continent']).agg({"total_deaths": "SUM"}).show()  

Calculating the total deaths per continent:


[Stage 0:>                                                        (0 + 16) / 16]

+-------------+-----------------+
|    continent|sum(total_deaths)|
+-------------+-----------------+
|       Europe|          2102483|
|       Africa|           259117|
|          nan|         22430618|
|North America|          1671178|
|South America|          1354187|
|      Oceania|            32918|
|         Asia|          1637249|
+-------------+-----------------+



                                                                                

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType

def convert_total_deaths(total_deaths):
    return total_deaths * 2

spark.udf.register("convert_total_deaths", convert_total_deaths, IntegerType())

<function __main__.convert_total_deaths(total_deaths)>

In [12]:
spark.sql("DROP VIEW IF EXISTS data_v")
spark_df.createTempView('data_v')
spark.sql('SELECT continent, total_deaths, convert_total_deaths(total_deaths) as converted_total_deaths FROM data_v').show()

+-------------+------------+----------------------+
|    continent|total_deaths|converted_total_deaths|
+-------------+------------+----------------------+
|         Asia|        7998|                 15996|
|          nan|      259117|                518234|
|       Europe|        3605|                  7210|
|       Africa|        6881|                 13762|
|      Oceania|          34|                    68|
|       Europe|         159|                   318|
|       Africa|        1937|                  3874|
|North America|          12|                    24|
|North America|         146|                   292|
|South America|      130663|                261326|
|         Asia|        8777|                 17554|
|North America|         292|                   584|
|          nan|     1637249|               3274498|
|      Oceania|       25236|                 50472|
|       Europe|       22534|                 45068|
|         Asia|       10353|                 20706|
|North Ameri

In [13]:
spark.sql('SELECT * FROM data_v').show()

+-------------+-----------+------------+------------------+----------+
|    continent|total_cases|total_deaths|total_vaccinations|population|
+-------------+-----------+------------+------------------+----------+
|         Asia|     235214|        7998|                 0|  41128772|
|          nan|   13145380|      259117|                 0|1426736614|
|       Europe|     335047|        3605|                 0|   2842318|
|       Africa|     272139|        6881|                 0|  44903228|
|      Oceania|       8359|          34|                 0|     44295|
|       Europe|      48015|         159|                 0|     79843|
|       Africa|     107481|        1937|                 0|  35588996|
|North America|       3904|          12|                 0|     15877|
|North America|       9106|         146|                 0|     93772|
|South America|   10101218|      130663|                 0|  45510324|
|         Asia|     452273|        8777|                 0|   2780472|
|North

In [14]:
print("Displaying continent with total vaccinated more than 1 million:")
spark.sql("SELECT continent FROM data_v WHERE total_vaccinations > 1000000").show()

Displaying continent with total vaccinated more than 1 million:
+-------------+
|    continent|
+-------------+
|          nan|
|North America|
|       Europe|
|       Europe|
|          nan|
|          nan|
|          nan|
|         Asia|
|         Asia|
|       Europe|
|          nan|
|         Asia|
|      Oceania|
|          nan|
|          nan|
|          nan|
|          nan|
+-------------+

