In [3]:
!pip install pyspark
!pip install findspark
!pip install pandas

[0mCollecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
[0mInstalling collected packages: findspark
Successfully installed findspark-2.0.1
[0m

In [4]:
import findspark  # This helps us find and use Apache Spark
findspark.init()  # Initialize findspark to locate Spark
from pyspark.sql import SparkSession  
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DateType
import pandas as pd 

In [5]:
 # Initialize a Spark Session
spark = SparkSession \
    .builder \
    .appName("COVID-19 Data Analysis") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

# Check if the Spark Session is active
if 'spark' in locals() and isinstance(spark, SparkSession):
    print("SparkSession is active and ready to use.")
else:
    print("SparkSession is not active. Please create a SparkSession.")

SparkSession is active and ready to use.


In [7]:
vaccination_data = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/KpHDlIzdtR63BdTofl1mOg/owid-covid-latest.csv')

In [9]:
vaccination_data.head()

Unnamed: 0,iso_code,continent,location,last_updated_date,total_cases,new_cases,new_cases_smoothed,total_deaths,new_deaths,new_deaths_smoothed,...,male_smokers,handwashing_facilities,hospital_beds_per_thousand,life_expectancy,human_development_index,population,excess_mortality_cumulative_absolute,excess_mortality_cumulative,excess_mortality,excess_mortality_cumulative_per_million
0,AFG,Asia,Afghanistan,2024-08-04,235214.0,0.0,0.0,7998.0,0.0,0.0,...,,37.746,0.5,64.83,0.511,41128770.0,,,,
1,OWID_AFR,,Africa,2024-08-04,13145380.0,36.0,5.143,259117.0,0.0,0.0,...,,,,,,1426737000.0,,,,
2,ALB,Europe,Albania,2024-08-04,335047.0,0.0,0.0,3605.0,0.0,0.0,...,51.2,,2.89,78.57,0.795,2842318.0,,,,
3,DZA,Africa,Algeria,2024-08-04,272139.0,18.0,2.571,6881.0,0.0,0.0,...,30.4,83.741,1.9,76.88,0.748,44903230.0,,,,
4,ASM,Oceania,American Samoa,2024-08-04,8359.0,0.0,0.0,34.0,0.0,0.0,...,,,,73.74,,44295.0,,,,


In [17]:
columns_to_display = ['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'population','last_updated_date']
# Show the first 5 records
print(vaccination_data[columns_to_display].head())

  continent  total_cases  total_deaths  total_vaccinations  population  \
0      Asia       235214          7998                   0    41128772   
1       nan     13145380        259117                   0  1426736614   
2    Europe       335047          3605                   0     2842318   
3    Africa       272139          6881                   0    44903228   
4   Oceania         8359            34                   0       44295   

  last_updated_date  
0        2024-08-04  
1        2024-08-04  
2        2024-08-04  
3        2024-08-04  
4        2024-08-04  


In [19]:
# Convert to Spark DataFrame directly
# Define the schema
schema = StructType([
    StructField("continent", StringType(), True),
    StructField("total_cases", LongType(), True),
    StructField("total_deaths", LongType(), True),
    StructField("total_vaccinations", LongType(), True),
    StructField("population", LongType(), True),
    StructField("last_updated_date",DateType(),True)
])

# Convert the columns to the appropriate data types
# Ensures data types and fill NaNs
vaccination_data['continent'] = vaccination_data['continent'].astype(str)  
vaccination_data['total_cases'] = vaccination_data['total_cases'].fillna(0).astype('int64')  
vaccination_data['total_deaths'] = vaccination_data['total_deaths'].fillna(0).astype('int64') 
vaccination_data['total_vaccinations'] = vaccination_data['total_vaccinations'].fillna(0).astype('int64')  
vaccination_data['population'] = vaccination_data['population'].fillna(0).astype('int64')
vaccination_data['last_updated_date'] = vaccination_data['last_updated_date'].fillna(pd.Timestamp('2000-01-01'))


#specified fields are taken for dataframe
spark_df = spark.createDataFrame(vaccination_data[schema.fieldNames()]) 
# Show the Spark DataFrame
spark_df.show()

  PyArrow >= 4.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


+-------------+-----------+------------+------------------+----------+-----------------+
|    continent|total_cases|total_deaths|total_vaccinations|population|last_updated_date|
+-------------+-----------+------------+------------------+----------+-----------------+
|         Asia|     235214|        7998|                 0|  41128772|       2024-08-04|
|          nan|   13145380|      259117|                 0|1426736614|       2024-08-04|
|       Europe|     335047|        3605|                 0|   2842318|       2024-08-04|
|       Africa|     272139|        6881|                 0|  44903228|       2024-08-04|
|      Oceania|       8359|          34|                 0|     44295|       2024-08-04|
|       Europe|      48015|         159|                 0|     79843|       2024-08-04|
|       Africa|     107481|        1937|                 0|  35588996|       2024-08-04|
|North America|       3904|          12|                 0|     15877|       2024-08-04|
|North America|      

In [20]:
spark_df.printSchema()

root
 |-- continent: string (nullable = true)
 |-- total_cases: long (nullable = true)
 |-- total_deaths: long (nullable = true)
 |-- total_vaccinations: long (nullable = true)
 |-- population: long (nullable = true)
 |-- last_updated_date: string (nullable = true)



In [21]:
# List the names of the columns you want to display
columns_to_display = ['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'population','last_updated_date']
# Display the first 5 records of the specified columns
spark_df.select(columns_to_display).show(5)

+---------+-----------+------------+------------------+----------+-----------------+
|continent|total_cases|total_deaths|total_vaccinations|population|last_updated_date|
+---------+-----------+------------+------------------+----------+-----------------+
|     Asia|     235214|        7998|                 0|  41128772|       2024-08-04|
|      nan|   13145380|      259117|                 0|1426736614|       2024-08-04|
|   Europe|     335047|        3605|                 0|   2842318|       2024-08-04|
|   Africa|     272139|        6881|                 0|  44903228|       2024-08-04|
|  Oceania|       8359|          34|                 0|     44295|       2024-08-04|
+---------+-----------+------------+------------------+----------+-----------------+
only showing top 5 rows



In [22]:
print("Displaying the 'continent' and 'total_cases' columns:")
# Show only the 'continent' and 'total_cases' columns
spark_df.select('continent', 'total_cases').show(5)

Displaying the 'continent' and 'total_cases' columns:
+---------+-----------+
|continent|total_cases|
+---------+-----------+
|     Asia|     235214|
|      nan|   13145380|
|   Europe|     335047|
|   Africa|     272139|
|  Oceania|       8359|
+---------+-----------+
only showing top 5 rows



In [23]:
print("Filtering records where 'total_cases' is greater than 1,000,000:")
 # Show records with more than 1 million total cases
spark_df.filter(spark_df['total_cases'] > 1000000).show(5) 

Filtering records where 'total_cases' is greater than 1,000,000:
+-------------+-----------+------------+------------------+----------+-----------------+
|    continent|total_cases|total_deaths|total_vaccinations|population|last_updated_date|
+-------------+-----------+------------+------------------+----------+-----------------+
|          nan|   13145380|      259117|                 0|1426736614|       2024-08-04|
|South America|   10101218|      130663|                 0|  45510324|       2024-08-04|
|          nan|  301499099|     1637249|        9104304615|4721383370|       2024-08-14|
|      Oceania|   11861161|       25236|                 0|  26177410|       2024-08-04|
|       Europe|    6082444|       22534|                 0|   8939617|       2024-08-04|
+-------------+-----------+------------+------------------+----------+-----------------+
only showing top 5 rows



In [27]:
from pyspark.sql import functions as F

spark_df_with_percentage = spark_df.withColumn(
    'death_percentage', 
    (spark_df['total_deaths'] / spark_df['population']) * 100
)
spark_df_with_percentage = spark_df_with_percentage.withColumn(
    'death_percentage',
    F.concat(
        # Format to 2 decimal places
        F.format_number(spark_df_with_percentage['death_percentage'], 2), 
        # Append the percentage symbol 
        F.lit('%')  
    )
)
columns_to_display = ['total_deaths', 'population', 'death_percentage', 'continent', 'total_vaccinations', 'total_cases','last_updated_date']
spark_df_with_percentage.select(columns_to_display).show(5)

+------------+----------+----------------+---------+------------------+-----------+-----------------+
|total_deaths|population|death_percentage|continent|total_vaccinations|total_cases|last_updated_date|
+------------+----------+----------------+---------+------------------+-----------+-----------------+
|        7998|  41128772|           0.02%|     Asia|                 0|     235214|       2024-08-04|
|      259117|1426736614|           0.02%|      nan|                 0|   13145380|       2024-08-04|
|        3605|   2842318|           0.13%|   Europe|                 0|     335047|       2024-08-04|
|        6881|  44903228|           0.02%|   Africa|                 0|     272139|       2024-08-04|
|          34|     44295|           0.08%|  Oceania|                 0|       8359|       2024-08-04|
+------------+----------+----------------+---------+------------------+-----------+-----------------+
only showing top 5 rows



In [29]:
from pyspark.sql.functions import year, to_date

spark_df_with_percentage = spark_df_with_percentage.withColumn('updated_date', year(to_date('last_updated_date','yyyy-MM-dd')))
spark_df_with_percentage.show(5)

+---------+-----------+------------+------------------+----------+-----------------+----------------+------------+
|continent|total_cases|total_deaths|total_vaccinations|population|last_updated_date|death_percentage|updated_date|
+---------+-----------+------------+------------------+----------+-----------------+----------------+------------+
|     Asia|     235214|        7998|                 0|  41128772|       2024-08-04|           0.02%|        2024|
|      nan|   13145380|      259117|                 0|1426736614|       2024-08-04|           0.02%|        2024|
|   Europe|     335047|        3605|                 0|   2842318|       2024-08-04|           0.13%|        2024|
|   Africa|     272139|        6881|                 0|  44903228|       2024-08-04|           0.02%|        2024|
|  Oceania|       8359|          34|                 0|     44295|       2024-08-04|           0.08%|        2024|
+---------+-----------+------------+------------------+----------+--------------

#### UDF
A User-Defined Function (UDF) in Spark is a custom function that you can define to perform specific transformations or calculations on your data. Spark UDFs are useful when built-in Spark functions don't provide the functionality you need.


In [31]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# Define the UDF in Python
def convert_total_deaths(total_deaths):
    # Example conversion logic; replace with your own
    return total_deaths * 2 if total_deaths is not None else 0

# Register the UDF with a return type
convert_total_deaths_udf = udf(convert_total_deaths, IntegerType())

# Register the UDF in Spark SQL context
spark.udf.register("convert_total_deaths", convert_total_deaths_udf)

# Drop the existing temporary view if it exists
spark.sql("DROP VIEW IF EXISTS data_v")

# Create a new temporary view
spark_df.createTempView('data_v')

# Execute the SQL query using the registered UDF
spark.sql('SELECT continent, total_deaths, convert_total_deaths(total_deaths) as converted_total_deaths FROM data_v').show()


+-------------+------------+----------------------+
|    continent|total_deaths|converted_total_deaths|
+-------------+------------+----------------------+
|         Asia|        7998|                 15996|
|          nan|      259117|                518234|
|       Europe|        3605|                  7210|
|       Africa|        6881|                 13762|
|      Oceania|          34|                    68|
|       Europe|         159|                   318|
|       Africa|        1937|                  3874|
|North America|          12|                    24|
|North America|         146|                   292|
|South America|      130663|                261326|
|         Asia|        8777|                 17554|
|North America|         292|                   584|
|          nan|     1637249|               3274498|
|      Oceania|       25236|                 50472|
|       Europe|       22534|                 45068|
|         Asia|       10353|                 20706|
|North Ameri

RDDs (Resilient Distributed Datasets) are collections of objects similar to Python lists but with a key difference: while Python lists are processed in a single process on one machine, RDDs are distributed across multiple processes on various physical servers, or nodes, within a cluster. This setup enables RDDs to provide built-in parallelism, allowing data to be processed simultaneously across nodes

In [34]:
#Create an RDD with integers from 1-50. Apply a transformation to multiply 
#every number by 2, resulting in an RDD that contains the first 50 even numbers

numbers = range(1, 50) 
numbers_RDD = spark.sparkContext.parallelize(numbers) 
even_numbers_RDD = numbers_RDD.map(lambda x: x * 2)
print( even_numbers_RDD.collect())

[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98]
