<p style="text-align:center">
    <a href="https://skills.network" target="_blank">
    <img src="https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/images/IDSN-logo.png" width="200" alt="Skills Network Logo"  />
    </a>
</p>


# Getting Started with PySpark and Pandas


SAME AS THE ORIGINAL NOTEBOOK, BUT WE IMPORT THE WHOOOOLE pandas DATAFRAME

Estimated time needed: **60** minutes


PySpark is the Python API for Apache Spark, a distributed computing system designed for handling large-scale data processing. Its ability to perform computations across multiple nodes makes it an ideal choice for Big Data scenarios, where data sets can reach terabytes or even petabytes in size. PySpark allows data scientists and analysts to harness the power of distributed computing, scaling up their workflows and processing capabilities significantly.


In contrast, Pandas is a powerful library tailored for data manipulation and analysis in Python, primarily used for handling structured data. It provides rich functionality for data cleaning, transformation, aggregation, and visualization, making it particularly suited for smaller data sets typically fitting into memory. Pandas excels at operations requiring quick turnaround times for data analysis and exploration, facilitating tasks such as data wrangling and exploratory data analysis.


To illustrate the strengths of both PySpark and Pandas, we will utilize a sample COVID-19 data set containing daily cases, deaths, and vaccinations across various continents.


## Objectives


- Understand PySpark and Pandas: Explain the core functionalities and use cases of PySpark for big data processing and Pandas for data manipulation.


- Set up the environment: Install and configure PySpark and Pandas to work together in a Python environment.


- Load and explore data: Import data into Pandas and PySpark DataFrames and perform basic data exploration.


- Convert between DataFrames: Convert a Pandas DataFrame to a Spark DataFrame for distributed processing.


- Perform data manipulation: Create new columns, filter data, and perform aggregations using PySpark.


- Utilize SQL queries: Use Spark SQL for querying data and leveraging user-defined functions (UDFs).


## 1. PySpark overview


### Overview


PySpark is the Python API for Apache Spark, designed for large-scale data processing and analysis. It offers tools for working with RDDs and DataFrames, enabling efficient, fault-tolerant distributed computing.


### Key features


- **Distributed computing:** Handles data across multiple nodes in a cluster.
- **High performance:** Outperforms traditional frameworks in speed.
- **Big data handling:** Manages data sets larger than a single machine's memory.
- **Python integration:** Compatible with Python libraries like Pandas and NumPy.


### Use cases


- **Large-scale data processing:** Ideal for processing large volumes of data that exceed the capacity of a single machine.
- **Data analysis:** Useful for complex data manipulations and analysis using distributed computing.


### Strengths


- High-speed data processing.
- Fault-tolerant and scalable.


### Limitations


- Complex setup and configuration.
- Steeper learning curve compared to some data processing tools.


## 2. Understanding Pandas


### Overview


Pandas is a Python library designed for data manipulation and analysis. It provides two primary data structures: Series and DataFrame, which facilitate handling and organizing structured data.


### Key features


- **Data structures:** Series for one-dimensional data and DataFrame for two-dimensional data.
- **Data I/O:** Reads and writes data in various formats such as CSV, Excel, and SQL.
- **Data cleaning:** Functions for handling missing or duplicate data.
- **Data analysis:** Includes statistical functions for detailed data analysis.


### Use cases


- **Data manipulation:** Efficient handling of structured data.
- **Data analysis:** Comprehensive analysis and transformation of data sets.


### Strengths


- User-friendly API for data manipulation.
- Extensive support for various data formats.


### Limitations


- Limited scalability for extremely large datasets compared to distributed frameworks.


## 3. Setting up the environment


### Installation


- First, let's install the necessary libraries if they are not already installed.


In [21]:
!pip install pyspark[pandas_on_spark] plotly 



In [11]:
!pip install findspark



In [12]:
!pip install pandas



## 4. Initializing a Spark session


A Spark session is crucial for working with PySpark. It enables DataFrame creation, data loading, and various operations.


### Importing libraries


- `findspark` is used to locate the Spark installation.
- `pandas` is imported for data manipulation.


### Creating a Spark session


- `SparkSession.builder.appName("COVID-19 Data Analysis").getOrCreate()` initializes a Spark session with the specified application name.


### Checking Spark session


- The code checks if the Spark session is active and prints an appropriate message.


In [13]:
import findspark  # This helps us find and use Apache Spark (in Python!)
findspark.init()  # Initialize findspark to locate Spark
from pyspark.sql import SparkSession  
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DateType
import pandas as pd  
# Initialize a Spark Session
spark = SparkSession \
    .builder \
    .appName("COVID-19 Data Analysis") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

# Check if the Spark Session is active
if 'spark' in locals() and isinstance(spark, SparkSession):
    print("SparkSession is active and ready to use.")
else:
    print("SparkSession is not active. Please create a SparkSession.")

SparkSession is active and ready to use.


## 5. Importing data into Pandas from various sources


Let's read the COVID-19 data from the provided URL.


In [14]:
# Read the COVID-19 data from the provided URL
vaccination_data = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/KpHDlIzdtR63BdTofl1mOg/owid-covid-latest.csv')

## 6. Displaying the first five records


### To retrieve and print the first five records


- `vaccination_data.head()` retrieves the first five rows of the DataFrame vaccination_data.This gives us a quick look at the data contained within the data set.
- The `print` function is used to display a message indicating what is being shown, followed by the actual data.


### Selecting specific columns:


- Let\'s define a list called `columns_to_display`, which contains the names of the columns as : `['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'population']`.
- By using `vaccination_data[columns_to_display].head()`, let\'s filter the DataFrame to only show the specified columns and again display the first five records of this subset.
- The continent column is explicitly converted to string, while the numeric columns (total cases, total deaths, total vaccinations, population) are filled with zeros for NaN values and then converted to int64 (which is compatible with LongType in Spark).
- The use of fillna(0) ensures that NaN values do not cause type issues during the Spark DataFrame creation.


In [15]:
print("Displaying the first 5 records of the vaccination data:")
columns_to_display = ['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'population']
# Show the first 5 records
print(vaccination_data[columns_to_display].head())

Displaying the first 5 records of the vaccination data:
  continent  total_cases  total_deaths  total_vaccinations    population
0      Asia     235214.0        7998.0                 NaN  4.112877e+07
1       NaN   13145380.0      259117.0                 NaN  1.426737e+09
2    Europe     335047.0        3605.0                 NaN  2.842318e+06
3    Africa     272139.0        6881.0                 NaN  4.490323e+07
4   Oceania       8359.0          34.0                 NaN  4.429500e+04


In [16]:
#summary
print(f"Row count: {vaccination_data.shape}")
vaccination_data.describe()

Row count: (247, 67)


Unnamed: 0,total_cases,new_cases,new_cases_smoothed,total_deaths,new_deaths,new_deaths_smoothed,total_cases_per_million,new_cases_per_million,new_cases_smoothed_per_million,total_deaths_per_million,...,male_smokers,handwashing_facilities,hospital_beds_per_thousand,life_expectancy,human_development_index,population,excess_mortality_cumulative_absolute,excess_mortality_cumulative,excess_mortality,excess_mortality_cumulative_per_million
count,246.0,242.0,242.0,246.0,243.0,243.0,246.0,242.0,242.0,246.0,...,145.0,96.0,173.0,231.0,190.0,247.0,0.0,0.0,0.0,0.0
mean,13366340.0,885.607438,126.515355,119868.9,14.032922,2.00472,203988.255797,22.204909,3.172136,1271.427736,...,32.909897,50.788844,3.097012,73.660866,0.7225,130765600.0,,,,
std,65681300.0,4854.786157,693.540908,574724.0,92.179347,13.16853,200456.90214,82.962646,11.851812,1322.697453,...,13.621757,32.124848,2.555777,7.405725,0.149398,668433300.0,,,,
min,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,7.7,1.188,0.1,53.28,0.394,47.0,,,,
25%,27509.5,0.0,0.0,183.75,0.0,0.0,21257.7665,0.0,0.0,144.80825,...,22.6,20.482,1.3,69.545,0.603,429495.5,,,,
50%,232098.5,0.0,0.0,2205.5,0.0,0.0,135384.895,0.0,0.0,877.689,...,33.1,49.6905,2.5,75.05,0.74,5970430.0,,,,
75%,1703974.0,5.5,0.7855,19388.5,0.0,0.0,340625.3,0.232,0.03325,2032.222,...,41.3,82.68675,4.2,79.285,0.82875,28956710.0,,,,
max,775866800.0,47169.0,6738.429,7057132.0,815.0,116.429,763598.6,672.437,96.062,6601.11,...,78.1,100.0,13.8,86.75,0.957,7975105000.0,,,,


## 7. Converting the Pandas DataFrame to a Spark DataFrame


Let\'s convert the Pandas DataFrame, which contains our COVID-19 vaccination data, into a Spark DataFrame. This conversion is crucial as it allows us to utilize Spark\'s distributed computing capabilities, enabling us to handle larger datasets and perform operations in a more efficient manner.


### Defining the schema:


- **StructType**: 
  - A class that defines a structure for a DataFrame.

- **StructField**: 
  - Represents a single field in the schema.
  - **Parameters**:
    1. **Field name**: The name of the field.
    2. **Data type**: The type of data for the field.
    3. **Nullable**: A boolean indicating whether null values are allowed.

- **Data types**:
  - **StringType()**: Used for text fields.
  - **LongType()**: Used for numerical fields.


### Data type conversion:


- **astype(str)**: 
  - Used to convert the `'continent'` column to string type.

- **fillna(0)**: 
  - Replaces any NaN values with 0, ensuring that the numerical fields do not contain any missing data.

- **astype('int64')**: 
  - Converts the columns from potentially mixed types to 64-bit integers for consistent numerical representation.


### Creating a Spark DataFrame:


- **createDataFrame**:
  - The `createDataFrame` method of the Spark session (`spark`) is called with `vaccination_data` (the Pandas DataFrame) as its argument.
  - **Parameters**:
    - It takes as input a subset of the pandas DataFrame that corresponds to the fields defined in the schema, accessed using `schema.fieldNames()`.
- This function automatically converts the Pandas DataFrame into a Spark DataFrame, which is designed to handle larger data sets across a distributed environment.


- The resulting spark_df will have the defined schema, which ensures consistency and compatibility with Spark's data processing capabilities.


### Storing the result:


In [22]:
# type(vaccination_data)  -> vaccination_data

#Create PySpark DataFrame from Pandas
sparkDF=spark.createDataFrame(vaccination_data) 
sparkDF.printSchema()
sparkDF.show()

  sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


root
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- location: string (nullable = true)
 |-- last_updated_date: string (nullable = true)
 |-- total_cases: double (nullable = true)
 |-- new_cases: double (nullable = true)
 |-- new_cases_smoothed: double (nullable = true)
 |-- total_deaths: double (nullable = true)
 |-- new_deaths: double (nullable = true)
 |-- new_deaths_smoothed: double (nullable = true)
 |-- total_cases_per_million: double (nullable = true)
 |-- new_cases_per_million: double (nullable = true)
 |-- new_cases_smoothed_per_million: double (nullable = true)
 |-- total_deaths_per_million: double (nullable = true)
 |-- new_deaths_per_million: double (nullable = true)
 |-- new_deaths_smoothed_per_million: double (nullable = true)
 |-- reproduction_rate: double (nullable = true)
 |-- icu_patients: double (nullable = true)
 |-- icu_patients_per_million: double (nullable = true)
 |-- hosp_patients: double (nullable = true)
 |-- hosp_patient

25/02/07 21:25:48 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+--------+-------------+-------------------+-----------------+------------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+-----------+---------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+--------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+-------------------------------------+------------------------------+-----------

## 8. Checking the structure of the Spark DataFrame


In this section, Let\'s examine the structure of the Spark DataFrame that we created from the Pandas DataFrame. Understanding the schema of a DataFrame is crucial as it provides insight into the data types of each column and helps ensure that the data is organized correctly for analysis.


### Displaying the schema:

- The method `spark_df.printSchema()` is called to output the structure of the Spark DataFrame.
- This method prints the names of the columns along with their data types (e.g., `StringType`, `IntegerType`, `DoubleType`, etc.), providing a clear view of how the data is organized.


In [25]:
print("Schema of the Spark DataFrame:")
sparkDF.printSchema()
# Print the structure of the DataFrame (columns and types)

Schema of the Spark DataFrame:
root
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- location: string (nullable = true)
 |-- last_updated_date: string (nullable = true)
 |-- total_cases: double (nullable = true)
 |-- new_cases: double (nullable = true)
 |-- new_cases_smoothed: double (nullable = true)
 |-- total_deaths: double (nullable = true)
 |-- new_deaths: double (nullable = true)
 |-- new_deaths_smoothed: double (nullable = true)
 |-- total_cases_per_million: double (nullable = true)
 |-- new_cases_per_million: double (nullable = true)
 |-- new_cases_smoothed_per_million: double (nullable = true)
 |-- total_deaths_per_million: double (nullable = true)
 |-- new_deaths_per_million: double (nullable = true)
 |-- new_deaths_smoothed_per_million: double (nullable = true)
 |-- reproduction_rate: double (nullable = true)
 |-- icu_patients: double (nullable = true)
 |-- icu_patients_per_million: double (nullable = true)
 |-- hosp_patients: double (nul

## 9. Basic data exploration


In this section, let\'s perform basic data exploration on the Spark DataFrame. This step is essential for understanding the data set better, allowing us to gain insights and identify any patterns or anomalies. Let\'s demonstrate how to view specific contents of the DataFrame, select certain columns, and filter records based on conditions.


### 9.1 Viewing DataFrame contents


- To view the contents in the DataFrame, use the following code:


In [35]:
# Display the first 5 records of all the columns
sparkDF.show(3)

+--------+---------+-----------+-----------------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+-----------+---------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+--------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+-------------------------------------+------------------------------+------------------------

### 9.2 Picking specific columns


- To display only certain columns, use the following code:


In [36]:
# List the names of the columns you want to display
columns_to_display = ['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'total_vaccinations_per_hundred','population', 'excess_mortality']
# Display the first 5 records of the specified columns
sparkDF.select(columns_to_display).show(10)

+-------------+-----------+------------+------------------+------------------------------+-------------+----------------+
|    continent|total_cases|total_deaths|total_vaccinations|total_vaccinations_per_hundred|   population|excess_mortality|
+-------------+-----------+------------+------------------+------------------------------+-------------+----------------+
|         Asia|   235214.0|      7998.0|               NaN|                           NaN|  4.1128772E7|             NaN|
|          NaN| 1.314538E7|    259117.0|               NaN|                           NaN|1.426736614E9|             NaN|
|       Europe|   335047.0|      3605.0|               NaN|                           NaN|    2842318.0|             NaN|
|       Africa|   272139.0|      6881.0|               NaN|                           NaN|  4.4903228E7|             NaN|
|      Oceania|     8359.0|        34.0|               NaN|                           NaN|      44295.0|             NaN|
|       Europe|    48015

### 9.3 Sifting Through Data


- To filter records based on a specific condition, use the following code:


In [39]:
print("Filtering records where 'total_cases' is greater than 1,000,000:")
 # Show records with more than 1 million total cases
sparkDF.filter(sparkDF['total_cases'] > 1000000).show(5) 

Filtering records where 'total_cases' is greater than 1,000,000:
+--------+-------------+---------+-----------------+------------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+-----------+---------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+--------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+-------------------------

In [40]:
print("Calculating the total deaths per continent:")
# Group by continent and sum total death rates
sparkDF.groupby(['continent']).agg({"total_deaths": "SUM"}).show()  

Calculating the total deaths per continent:




+-------------+-----------------+
|    continent|sum(total_deaths)|
+-------------+-----------------+
|       Europe|        2102483.0|
|       Africa|         259117.0|
|North America|        1671178.0|
|South America|        1354187.0|
|          NaN|      2.2430618E7|
|      Oceania|          32918.0|
|         Asia|              NaN|
+-------------+-----------------+



                                                                                

In [42]:
# Drop the existing temporary view if it exists
spark.sql("DROP VIEW IF EXISTS data_v")

# Create a new temporary view
sparkDF.createTempView('data_v')

In [43]:
spark.sql('SELECT * FROM data_v').show()

+--------+-------------+-------------------+-----------------+------------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+-----------+---------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+--------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+-------------------------------------+------------------------------+-----------

In [43]:
spark.sql('SELECT * FROM data_v').show()

+--------+-------------+-------------------+-----------------+------------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+-----------+---------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+--------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+-------------------------------------+------------------------------+-----------

In [45]:
spark.sql('SELECT continent, total_deaths, population, total_vaccinations, total_cases FROM data_v').show()

+-------------+------------+-------------+------------------+------------+
|    continent|total_deaths|   population|total_vaccinations| total_cases|
+-------------+------------+-------------+------------------+------------+
|         Asia|      7998.0|  4.1128772E7|               NaN|    235214.0|
|          NaN|    259117.0|1.426736614E9|               NaN|  1.314538E7|
|       Europe|      3605.0|    2842318.0|               NaN|    335047.0|
|       Africa|      6881.0|  4.4903228E7|               NaN|    272139.0|
|      Oceania|        34.0|      44295.0|               NaN|      8359.0|
|       Europe|       159.0|      79843.0|               NaN|     48015.0|
|       Africa|      1937.0|  3.5588996E7|               NaN|    107481.0|
|North America|        12.0|      15877.0|               NaN|      3904.0|
|North America|       146.0|      93772.0|               NaN|      9106.0|
|South America|    130663.0|  4.5510324E7|               NaN| 1.0101218E7|
|         Asia|      8777

Look the original notebook for the other steps


##  Author


**Ritika Joshi**


<!--## Change Log
|Date (YYYY-MM-DD)|Version|Changed By|Change Description|
|-|-|-|-|
|2024-09-23|0.1|Ritika|First Draft|
--!>
