# Building an ETL Pipeline using PySpark

An ETL (Extract, Transform, and Load) pipeline extracts data from sources, transforms it, and loads it into a storage system. It helps create clean, usable data formats for analysis. PySpark is ideal for building ETL pipelines for large-scale data processing. It offers distributed computing, high performance, and handles structured and unstructured data efficiently. This article will take you through building an ETL pipeline using PySpark.

The dataset we will be using for building an ETL Pipeline contains temperature-related data for various countries from 1961 to 2022. The columns include identifiers like ObjectId, Country, ISO2, and ISO3, along with year-wise temperature data such as F1961, F1962, etc., as floating-point values. Some columns contain missing values.

You can download this dataset from:
https://statso.io/carbon-emissions-worldwide-case-study/#google_vignette

We’ll develop an ETL Pipeline using PySpark to process this dataset to handle the following tasks:

1. **Extract**: Load the dataset from the CSV file.
2. **Transform**: Clean the data, handle missing values, and pivot year-wise temperature data for analysis.
3. **Load**: Save the processed data into a new storage format (e.g., Parquet or a database).

### Step 1: Setting Up the Environment & Initializing a PySpark Session
Initialize a PySpark session to enable interaction with the Spark framework:

In [9]:
from pyspark.sql import SparkSession

# initialize SparkSession
spark = SparkSession.builder \
    .appName("ETL Pipeline") \
    .getOrCreate()

PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

### Step 2: Extract – Load the Dataset

In [6]:
# load the CSV file into a Spark DataFrame
file_path = "temperature.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# display the schema and preview the data
df.printSchema()
df.show(5)

NameError: name 'spark' is not defined

In PySpark, we are loading a CSV file into a distributed DataFrame, which is similar to using pandas.read_csv() to load data into a Pandas DataFrame. However, unlike Pandas, which uses memory and runs on a single machine, PySpark handles large datasets distributed across a cluster. The methods df.printSchema() and df.show(5) provide insights into the schema and preview the data, comparable to df.info() and df.head() in Pandas, but designed for scalable data exploration on big data workloads.

### Step 3: Transform – Clean and Process the Data
All datasets require different types of cleaning and processing steps. In this data, we will replace missing values in important columns like ISO2 or impute missing temperature values:

In [None]:
# fillthe missing vlaues for country codes
df = df.fillna({"ISO2":"Unkown"})

# drop rows where all the temperature values are null
temperature_columns = [col for col in df.columns if col.startsswiths('F')]
df = df.dropna(subset=temperature_columns, how="all")

Next, we will transform the dataset to have “Year” as a single column and its temperature value:

In [None]:
from pyspark.sql.functions import expr

#reshape temp data to have 'Year'and 'Temperature' columns
df_pivot = df.selectExpr(
    "ObjectId", "Country", "ISO3",
    "stack(62, " +
    ",".join([f"'F{1961 + i}', F{1961 + i}" for i in range(62)]) +
    ") as (Year, Temperature)"
)

# convert 'Year' column to integer
df_pivot = df_pivot.withColumn("Year", expr("int(substring(Year, 2, 4))"))
df_pivot.show(5)

### Step 4: Load – Save the Processed Data
After completing all the processing steps, you save the transformed data to a Parquet file for efficient storage and querying:

In [None]:
output_path = "/processed_temperature/parquet"
df_pivot.write.mode("overwrite").parquet(output_path)
process_df.show(5)

This operation saves the transformed DataFrame as a Parquet file, which optimizes it for storage and querying in a distributed environment.

We can load the saved Parquet file to ensure the data was correctly saved:

In [None]:
# load the saved parquet file
process_df = spark.read.parquet(output_path)
process_df.show(5)