starting a SparkSession

In [19]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder\
        .appName('pysparkbasics')\
        .getOrCreate() 

Reading a CSV File

In [8]:
# Load a CSV file into a DataFrame
covid_df = spark.read.csv("C:/Users/pooja konduri/Downloads/covid-kaggle-dataset/covid_19_clean_complete.csv", header=True, inferSchema=True)

# Show the first few rows of the DataFrame
covid_df.show(5)


+--------------+--------------+--------+---------+----------+---------+------+---------+------+--------------------+
|Province/State|Country/Region|     Lat|     Long|      Date|Confirmed|Deaths|Recovered|Active|          WHO Region|
+--------------+--------------+--------+---------+----------+---------+------+---------+------+--------------------+
|          NULL|   Afghanistan|33.93911|67.709953|2020-01-22|        0|     0|        0|     0|Eastern Mediterra...|
|          NULL|       Albania| 41.1533|  20.1683|2020-01-22|        0|     0|        0|     0|              Europe|
|          NULL|       Algeria| 28.0339|   1.6596|2020-01-22|        0|     0|        0|     0|              Africa|
|          NULL|       Andorra| 42.5063|   1.5218|2020-01-22|        0|     0|        0|     0|              Europe|
|          NULL|        Angola|-11.2027|  17.8739|2020-01-22|        0|     0|        0|     0|              Africa|
+--------------+--------------+--------+---------+----------+---

 Basic DataFrame Operations:

In [12]:
# Select the columns 'Country_Region' and 'Deaths'
covid_df.select(["Country/Region", "Deaths"]).show(5)


+--------------+------+
|Country/Region|Deaths|
+--------------+------+
|   Afghanistan|     0|
|       Albania|     0|
|       Algeria|     0|
|       Andorra|     0|
|        Angola|     0|
+--------------+------+
only showing top 5 rows



b) Filtering Data:

In [14]:
# Filter rows where the 'Country_Region' is 'India'
india_df = covid_df.filter(covid_df["Country/Region"] == "India")

# Show the filtered DataFrame
india_df.show(5)


+--------------+--------------+---------+--------+----------+---------+------+---------+------+---------------+
|Province/State|Country/Region|      Lat|    Long|      Date|Confirmed|Deaths|Recovered|Active|     WHO Region|
+--------------+--------------+---------+--------+----------+---------+------+---------+------+---------------+
|          NULL|         India|20.593684|78.96288|2020-01-22|        0|     0|        0|     0|South-East Asia|
|          NULL|         India|20.593684|78.96288|2020-01-23|        0|     0|        0|     0|South-East Asia|
|          NULL|         India|20.593684|78.96288|2020-01-24|        0|     0|        0|     0|South-East Asia|
|          NULL|         India|20.593684|78.96288|2020-01-25|        0|     0|        0|     0|South-East Asia|
|          NULL|         India|20.593684|78.96288|2020-01-26|        0|     0|        0|     0|South-East Asia|
+--------------+--------------+---------+--------+----------+---------+------+---------+------+---------

 Adding a New Column

In [16]:
# Add a new column 'death_percentage' (Deaths / Confirmed * 100)
covid_df = covid_df.withColumn("death_percentage", (covid_df["Deaths"] / covid_df["Confirmed"]) * 100)

# Show the DataFrame with the new column
covid_df.select("Country/Region", "Deaths", "Confirmed", "death_percentage").show(5)


+--------------+------+---------+----------------+
|Country/Region|Deaths|Confirmed|death_percentage|
+--------------+------+---------+----------------+
|   Afghanistan|     0|        0|            NULL|
|       Albania|     0|        0|            NULL|
|       Algeria|     0|        0|            NULL|
|       Andorra|     0|        0|            NULL|
|        Angola|     0|        0|            NULL|
+--------------+------+---------+----------------+
only showing top 5 rows



Aggregating Data

In [20]:
# Calculate the global total deaths
global_deaths = covid_df.agg(F.sum("Deaths").alias("total_deaths"))

# Show the result
global_deaths.show()


+------------+
|total_deaths|
+------------+
|    43384903|
+------------+



# Write the processed DataFrame to a CSV file
covid_df.write.csv("s3://bucket-name/processed-covid-data.csv")
