# Task 2: Data Ingestion & Optimization
###### 1. Read all raw CSV files from HDFS.
###### 2. Apply proper schema instead of inferSchema.
###### 3. Handle null values.
###### 4. Convert raw CSV files into Parquet format.
###### 5. Store them in /data/covid/staging.
##### Compare CSV vs Parquet:
###### 1. File size
###### 2. Read performance
###### 3. Execution plan
###### Explain why Parquet performs better.

In [1]:
## Import Statements

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col

In [12]:
## Starting Session
spark = SparkSession.builder.appName("Data Ingestion").getOrCreate()

In [3]:
spark

### Covid_19_clean_complete Table

In [4]:
#### schema for covid_19_clean_complete table
covid_clean_schema = StructType([
    StructField("Province/State", StringType(), True),
    StructField("Country/Region", StringType(), True),
    StructField("Lat", DoubleType(), True),
    StructField("Long", DoubleType(), True),
    StructField("Date", DateType(), True),
    StructField("Confirmed", LongType(), True),
    StructField("Deaths", LongType(), True),
    StructField("Recovered", LongType(), True),
    StructField("Active", LongType(), True),
    StructField("WHO Region", StringType(), True)
])

#### Loading data using defined schema.
covid_clean = spark.read.option("header",True).schema(covid_clean_schema).csv("hdfs:///data/covid/raw/covid_19_clean_complete.csv")

#### Handling null values.
covid_clean = covid_clean.na.drop(subset=["Date", "Country/Region"])
covid_clean = covid_clean.na.fill({
    "Province/State": "Unknown",
    "Lat": 0.0,
    "Long": 0.0,
    "Confirmed": 0,
    "Deaths": 0,
    "Recovered": 0,
    "Active": 0,
    "WHO Region": "Unknown"
})

#### Converting raw data into paraquet format.
covid_clean.write.mode("overwrite").parquet("hdfs:///data/covid/staging/covid_clean")

                                                                                

### Full_grouped Table

In [5]:
#### Schema for full_grouped table
full_grouped_schema = StructType([
    StructField("Date", DateType(), True),
    StructField("Country/Region", StringType(), True),
    StructField("Confirmed", LongType(), True),
    StructField("Deaths", LongType(), True),
    StructField("Recovered", LongType(), True),
    StructField("Active", LongType(), True),
    StructField("New cases", LongType(), True),
    StructField("New deaths", LongType(), True),
    StructField("New recovered", LongType(), True),
    StructField("WHO Region", StringType(), True)
])

#### Loading data using defined schema.
full_grouped = spark.read.option("header",True).schema(full_grouped_schema).csv("hdfs:///data/covid/raw/full_grouped.csv")

#### Handling null values.
full_grouped = full_grouped.na.drop(subset=["Date", "Country/Region"])
full_grouped = full_grouped.na.fill({
    "Confirmed": 0,
    "Deaths": 0,
    "Recovered": 0,
    "Active": 0,
    "New cases": 0,
    "New deaths": 0,
    "New recovered": 0,
    "WHO Region": "Unknown"
})

#### Converting raw data into paraquet format.
full_grouped.write.mode("overwrite").parquet("hdfs:///data/covid/staging/full_grouped")

                                                                                

### Country_Wise_Latest Table

In [6]:
#### Schema for country_wise_latest table
country_wise_latest_schema = StructType([
    StructField("Country/Region", StringType(), True),
    StructField("Confirmed", LongType(), True),
    StructField("Deaths", LongType(), True),
    StructField("Recovered", LongType(), True),
    StructField("Active", LongType(), True),
    StructField("New cases", LongType(), True),
    StructField("New deaths", LongType(), True),
    StructField("New recovered", LongType(), True),
    StructField("Deaths / 100 Cases", DoubleType(), True),
    StructField("Recovered / 100 Cases", DoubleType(), True)
])

#### Loading data using defined schema.
country_wise_latest = spark.read.option("header",True).schema(country_wise_latest_schema).csv("hdfs:///data/covid/raw/country_wise_latest.csv")

#### Handling null values.
country_wise_latest = country_wise_latest.na.drop(subset=["Country/Region"])
country_wise_latest = country_wise_latest.na.fill({
    "Confirmed": 0,
    "Deaths": 0,
    "Recovered": 0,
    "Active": 0,
    "New cases": 0,
    "New deaths": 0,
    "New recovered": 0,
    "Deaths / 100 Cases": 0.0,
    "Recovered / 100 Cases": 0.0
})

#### Converting raw data into paraquet format.
country_wise_latest.write.mode("overwrite").parquet("hdfs:///data/covid/staging/country_wise_latest")

26/02/17 19:57:35 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 15, schema size: 10
CSV file: hdfs://localhost:9000/data/covid/raw/country_wise_latest.csv


### Day_Wise Table

In [7]:
#### Schema for day_wise table
day_wise_schema = StructType([
    StructField("Date", DateType(), True),
    StructField("Confirmed", LongType(), True),
    StructField("Deaths", LongType(), True),
    StructField("Recovered", LongType(), True),
    StructField("Active", LongType(), True),
    StructField("New cases", LongType(), True),
    StructField("New deaths", LongType(), True),
    StructField("New recovered", LongType(), True),
    StructField("Deaths / 100 Cases", DoubleType(), True),
    StructField("Recovered / 100 Cases", DoubleType(), True)
])

#### Loading data using defined schema.
day_wise = spark.read.option("header",True).schema(day_wise_schema).csv("hdfs:///data/covid/raw/day_wise.csv")

#### Handling null values.
day_wise = day_wise.na.drop(subset=["Date"])

day_wise = day_wise.na.fill({
    "Confirmed": 0,
    "Deaths": 0,
    "Recovered": 0,
    "Active": 0,
    "New cases": 0,
    "New deaths": 0,
    "New recovered": 0,
    "Deaths / 100 Cases": 0.0,
    "Recovered / 100 Cases": 0.0
})


#### Converting raw data into paraquet format.
day_wise.write.mode("overwrite").parquet("hdfs:///data/covid/staging/day_wise")

26/02/17 19:57:36 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 12, schema size: 10
CSV file: hdfs://localhost:9000/data/covid/raw/day_wise.csv


### USA_Country_Wise Table

In [8]:
#### Schema for usa_country_wise table
usa_county_wise_schema = StructType([
    StructField("UID", LongType(), True),
    StructField("iso2", StringType(), True),
    StructField("iso3", StringType(), True),
    StructField("code3", IntegerType(), True),
    StructField("FIPS", DoubleType(), True),
    StructField("Admin2", StringType(), True),
    StructField("Province_State", StringType(), True),
    StructField("Country_Region", StringType(), True),
    StructField("Lat", DoubleType(), True),
    StructField("Long_", DoubleType(), True)
])

#### Loading data using defined schema.
usa_country_wise = spark.read.option("header",True).schema(usa_county_wise_schema).csv("hdfs:///data/covid/raw/usa_county_wise.csv")

#### Handling null values.
usa_country_wise = usa_country_wise.na.drop(subset=["UID", "Province_State"])
usa_country_wise = usa_country_wise.na.fill({
    "iso2": "Unknown",
    "iso3": "Unknown",
    "Admin2": "Unknown",
    "Country_Region": "Unknown",
    "Lat": 0.0,
    "Long_": 0.0
})

#### Converting raw data into paraquet format.
usa_country_wise.write.mode("overwrite").parquet("hdfs:///data/covid/staging/usa_country_wise")

26/02/17 19:57:36 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 14, schema size: 10
CSV file: hdfs://localhost:9000/data/covid/raw/usa_county_wise.csv
26/02/17 19:57:36 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

### Worldometer_data Table

In [9]:
#### Schema for worldometer_data table
worldometer_data_schema = StructType([
    StructField("Country/Region", StringType(), True),
    StructField("Continent", StringType(), True),
    StructField("Population", LongType(), True),
    StructField("TotalCases", LongType(), True),
    StructField("NewCases", LongType(), True),
    StructField("TotalDeaths", LongType(), True),
    StructField("NewDeaths", LongType(), True),
    StructField("TotalRecovered", LongType(), True),
    StructField("NewRecovered", LongType(), True),
    StructField("ActiveCases", LongType(), True)
])

## Loading data using defined schema.
worldometer_data = spark.read.option("header",True).schema(worldometer_data_schema).csv("hdfs:///data/covid/raw/worldometer_data.csv")

#### Handling null values.
worldometer_data = worldometer_data.na.drop(subset=["Country/Region", "Population"])
worldometer_data = worldometer_data.na.fill({
    "Continent": "Unknown",
    "TotalCases": 0,
    "NewCases": 0,
    "TotalDeaths": 0,
    "NewDeaths": 0,
    "TotalRecovered": 0,
    "NewRecovered": 0,
    "ActiveCases": 0
})

#### Converting raw data into paraquet format.
worldometer_data.write.mode("overwrite").parquet("hdfs:///data/covid/staging/worldometer_data")

26/02/17 19:57:38 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 16, schema size: 10
CSV file: hdfs://localhost:9000/data/covid/raw/worldometer_data.csv


### Comparing File sizes of Hadoop and Paraquet

In [10]:
## Comparing 
print("File size in Hadoop...")
!hdfs dfs -du -h /data/covid/raw
print("\nFile size in paraquet...")
!hdfs dfs -du -h /data/covid/staging


File size in Hadoop...
2026-02-17 19:57:39,520 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14.4 K  14.4 K  /data/covid/raw/country_wise_latest.csv
3.2 M   3.2 M   /data/covid/raw/covid_19_clean_complete.csv
14.1 K  14.1 K  /data/covid/raw/day_wise.csv
1.8 M   1.8 M   /data/covid/raw/full_grouped.csv
66.6 M  66.6 M  /data/covid/raw/usa_county_wise.csv
16.1 K  16.1 K  /data/covid/raw/worldometer_data.csv

File size in paraquet...
2026-02-17 19:57:40,587 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
11.5 K   11.5 K   /data/covid/staging/country_wise_latest
381.4 K  381.4 K  /data/covid/staging/covid_clean
11.8 K   11.8 K   /data/covid/staging/day_wise
417.9 K  417.9 K  /data/covid/staging/full_grouped
2.9 M    2.9 M    /data/covid/staging/usa_country_wise
10.1 K   10.1 K   /data/covid/staging/worldometer_data


In [13]:
spark.stop()