# **Note 2 is Continued from Note 1**


## **Intro to ETL**

ETL stands for Extract, Transform, Load. It is a process used in data warehousing and data integration to move data from various sources into a centralized repository, such as a data warehouse or a database. The ETL process consists of three main steps:

1. **Extract**: This step involves retrieving data from various source systems, which can include databases, flat files, APIs, or other data sources. The goal is to gather all relevant data needed for analysis.

2. **Transform**: In this step, the extracted data is cleaned, transformed, and prepared for analysis. This may involve filtering, aggregating, joining, or reshaping the data to ensure it is in the right format and structure for the target system.

3. **Load**: The final step is to load the transformed data into the target system, such as a data warehouse or a database. This makes the data available for querying and analysis by business intelligence tools or other applications.

## **ETL Process Overview**

The ETL process can be visualized as a pipeline where data flows through each of the three stages. Here’s a high-level overview of the ETL process:

```mermaid
graph TD
    A[Extract] --> B[Transform]
    B --> C[Load]
    C --> D[Data Warehouse]
    D --> E[Business Intelligence Tools]
```

## **Key Components of ETL**

- **Data Sources**: These can be databases, flat files, APIs, or any other systems where data resides.

- **ETL Tools**: Software applications that facilitate the ETL process, such as Apache Nifi, Talend, Informatica, or custom scripts.

- **Data Warehouse**: A centralized repository where transformed data is stored for analysis and reporting.

- **Data Quality**: Ensuring the accuracy, consistency, and reliability of the data throughout the ETL process.

- **Scheduling and Automation**: ETL processes can be scheduled to run at specific intervals or triggered by events to ensure data is always up-to-date.

## **Benefits of ETL**

- **Centralized Data**: ETL allows organizations to consolidate data from multiple sources into a single repository, making it easier to access and analyze.

- **Improved Data Quality**: The transformation step helps clean and standardize data, improving its quality for analysis.

- **Enhanced Decision Making**: By providing a unified view of data, ETL supports better decision-making processes within organizations.

- **Scalability**: ETL processes can be designed to handle large volumes of data, making them suitable for growing datasets.

## **ETL vs. ELT**

While ETL is a traditional approach, there is also a modern variant known as ELT (Extract, Load, Transform). In ELT, data is first extracted and loaded into the target system (like a data lake or data warehouse) before the transformation occurs. This approach leverages the processing power of modern databases to perform transformations after loading, allowing for more flexibility and scalability.

## **Conclusion**

ETL is a crucial process in data management that enables organizations to extract valuable insights from their data by transforming and loading it into a centralized repository. Understanding the ETL process is essential for anyone involved in data warehousing, business intelligence, or data integration projects. By implementing effective ETL practices, organizations can enhance their data quality, streamline their analytics processes, and make informed decisions based on accurate and timely information.


## **End to End ETL Pipeline with Airflow**

In this section, we will explore how to implement an end-to-end ETL pipeline using Apache Airflow, a powerful open-source tool for orchestrating complex workflows. Airflow allows you to define, schedule, and monitor ETL tasks efficiently.

### **Problem Statement**

We will create an ETL pipeline that extracts data from a public API, transforms it by cleaning and aggregating the data, and then loads it into a PostgreSQL database. The pipeline will be scheduled to run daily.

We will use the NASA API to extract data about asteroids and their close approaches to Earth. The data will be transformed to calculate the average size of asteroids and then loaded into a PostgreSQL database for further analysis.

We will dockerize the Airflow environment to ensure consistency and ease of deployment. The pipeline will consist of the following steps:

1. **Extract**: Fetch data from the NASA API.
2. **Transform**: Clean the data, calculate the average size of asteroids, and prepare it for loading.
3. **Load**: Insert the transformed data into a PostgreSQL database.

Both the Airflow and PostgreSQL services will be run in Docker containers, allowing for easy setup and management of the ETL pipeline. Here we will learn how the communication between the Airflow and PostgreSQL containers is established using Docker networking.

We will also implement the airflow hooks to ensure that the pipeline runs smoothly and handles any errors that may occur during the ETL process.

We will use different Airflow operators to perform the ETL tasks, including the `PythonOperator` for custom Python functions, the `PostgresOperator` for executing SQL commands, and the `DockerOperator` for running tasks in Docker containers and HTTP operator to make HTTP requests to the NASA API.


## **Project Begins**

Refer to `Airflow_ETL_Pipeline_Astro_Postgres`

Note that folder name should never contain special characters like `(), ; -` etc.

As by default both the `Airflow` and `Postgres` will be running in separate docker container. Therefore, we will need to establish a communication between these two.

We create `docker-compose.yml` file that will create a `Postgres` image with db_name, uname, password and env_vars.

### **Docker-Compose**

```yml
version: "3"
services:
  postgres:
    image: postgres:13
    container_name: postgres_db
    environment:
      POSTGRES_USER: birat
      POSTGRES_PASSWORD: admin
      POSTGRES_DB: postgres
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
    networks:
      - airflow_network

networks:
  airflow_network:
    external: false
```

In the above `yml` file, we use the `postgres` image with all the environment variables.

`Volume` tracks the data for consistency and `Network`

We will need to have a common network so that the containers can talk.

### **ETL DAG**

```Python

from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
import json
from airflow.utils.dates import days_ago
from httpx import post

# Define the DAG

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'retries': 1,
}

with DAG(
    'etl_dag',
    default_args=default_args,
    description='A simple ETL DAG',
    schedule='@daily',
) as dag:

  # Step 1: Create the table if it does not exist

    @task
    def create_table():
        pg_hook = PostgresHook(
            postgres_conn_id='postgres_default',
        )

        # SQL command to create the table for the API
        create_table_sql = """
        CREATE TABLE IF NOT EXISTS apod_data (
            id SERIAL PRIMARY KEY,
            title VARCHAR(255),
            explanation TEXT,
            url TEXT,
            date DATE,
            media_type VARCHAR(50),
        );
        """

        # Execute the SQL command
        pg_hook.run(create_table_sql)
  # Step 2: Extract data from the API

  # Step 3: Transform the data

  # Step 4: Load the data into PostgreSQL

  #

```

In the above code, we use `PostgresHook` to interact with the `Postgres`.

`pg_hook.run(create_table_sql)` to execute the `SQL Query`

**Setting Up the API**

Below is the format of the API :

```Json

  "copyright": "\nIreneusz Nowak\n",
  "date": "2025-05-27",
  "explanation": "Behold one of the most photogenic regions of the night sky, captured impressively.  Featured, the band of our Milky Way Galaxy runs diagonally along the bottom-left corner, while the colorful Rho Ophiuchi cloud complex is visible just right of center and the large red circular Zeta Ophiuchi Nebula appears near the top.  In general, red emanates from nebulas glowing in the light of excited hydrogen gas, while blue marks interstellar dust preferentially reflecting the light of bright young stars.  Thick dust usually appears dark brown.  Many iconic objects of the night sky appear, including (can you find them?) the bright star Antares, the globular star cluster M4, and the Blue Horsehead nebula. This wide field composite, taken over 17 hours, was captured from South Africa last June.    Explore Your Universe: Random APOD Generator",
  "hdurl": "https://apod.nasa.gov/apod/image/2505/RhoZeta_Nowak_2560.jpg",
  "media_type": "image",
  "service_version": "v1",
  "title": "Zeta and Rho Ophiuchi with Milky Way",
  "url": "https://apod.nasa.gov/apod/image/2505/RhoZeta_Nowak_960.jpg"
}

```

```Py

  # Step 2: Extract data from the API
    # api_endpoint = 'https://api.nasa.gov/planetary/apod?api_key=2qhSecq7ZVI2TyDgfHGAhblGmrl2Q7ZZHdj1b6Ij'
    extract_apod = SimpleHttpOperator(
        task_id='extract_apod',
        http_conn_id='apod_api', # Connection ID for the NASA API
        endpoint='planetary/apod', # Endpoint for the Astronomy Picture of the Day
        method='GET',
        data={"api_key": "{{ conn.nasa_api.extra_dejson.api_key }}"}, # API key from the connection
        response_filter=lambda response: response.json(), # Filter to get JSON response
  )

```

We use `SimpleHttpOperator` to send HTTP request. In this hook we provide `http_conn_id = apod_api` we will make use of `Airflow` connection in the `UI`. Then `endpoint` as to hit the `API`, the base path will be available from Airflow. Then finally `data` we get the `api_key` which is also be received from the `Airflow` connection.

Then finally we want the `response_filter` in `Json`.

### **Transforming the Data**

```Py

    @task
    def transform_data(response):
        apod_data = {
            "title": response.get("title", ""),
            "explanation": response.get("explanation", ""),
            "url": response.get("url", ""),
            "date": response.get("date", ""),
            "media_type": response.get("media_type", ""),
        }
        return apod_data

```

### **Loading the Data**

```Py

    @task
    def load_data_to_postgres(apo_data):
        # Initialize PostgresHook

        pg_hook = PostgresHook(postgres_conn_id="my_postgres_connection")

        # Define SQL Query
        insert_sql = """
        INSERT INTO apod_data (title, explanation, url, date, media_type)
        VALUES (%s, %s, %s, %s, %s);
        """

        # Execute the insert query
        pg_hook.run(
            insert_sql,
            parameters=(
                apo_data["title"],
                apo_data["explanation"],
                apo_data["url"],
                apo_data["date"],
                apo_data["media_type"],
            ),
        )

    # Define the task dependencies
    # Extracting the APOD data and transforming it into a format suitable for PostgreSQL
    create_table_task = create_table() >> extract_apod
    extract_apod_task = extract_apod.output >> transform_data
    # Transforming the data and loading it into PostgreSQL
    transform_data_task = transform_data(extract_apod_task)
    # Loading the transformed data into PostgreSQL
    load_data_to_postgres(transform_data_task)

```


**Get Inside the Astro Container** : `astro dev bash`

**Start Without Cache** : `astro dev restart --no-cache`

### **Depricated Things in Airflow**

`SimpleHttpOperator`, `Days_Before`

### **Important Steps Before Running the Astro**

As astro completely runs in the multiple Docker Container, it already by default installs all the required packages such as `Airflow`, `Postgres` and other. But, it does not install any other dependcies.

Therefore, we will need to include the name of the package in the `requirement.txt` of `Astro` project.

For example, I was getting a lots of error because I was installing the ` apache-airflow-providers-http` package in my environment but actually it needs to be installed inside the `container`.

Therefore, always add the `dependencies` inside the `requirements.txt` file.

Also, the `SimpleHttpOperator` is already depricated. We need to use `HttpOperator` which is provided by the third party providers.

[Providers_Package_Ref](https://airflow.apache.org/docs/apache-airflow-providers/packages-ref.html#id49)

The `username` and `password` in the `docker-compose` for the postgres should be `postgres` only. If we try to keep other names we will encounter error as `postgres` would be the default admin user while creating the container.

### **Setting the Important Connection (API and DB)**

Now, for the program to run properly we will need to setup the two important connection i.e. `Postgres` and other is `API`.

**API**

Go to the `Airflow` UI search for Connection.

<img src='./Notes_Img/a1.png'>

<img src='./Notes_Img/a2.png'>

We need to fill these `infos`, later when the `DAG` is executed the info will be reterived from here. The name should be the same.

**Postgres**

Click add connection.

Enter the `conn_id` used in the `DAG`. When the `Astro` starts the `Postgres` it will run the `Postgres` container with the details provided in the `docker-compose.yml` file. This file contains the `username` and `password`.

For the `host` go the container look for `postgres` click then copy the name of the container and paste in the `Host`.

<img src='./Notes_Img/a3.png'>

Then excute the `DAG` from the `Airflow` UI.

## **Possible Errors While Running DAG**

Due to volume inconsistency the `Username` and `Password` won't be able to find by the `Postgres` container due to which we will have to remove the `Volume` from the container.

```bash

(mlflow_env) toni-birat@tonibirat:/media/toni-birat/New Volume/ML_Flow_Complete/Airflow_ETL_Pipeline_Astro_Postgres$ docker volume ls
DRIVER    VOLUME NAME
local     4d2e2ee6bd280a5cb2f26d998389a0fd0aef5270ce1d2814d9c22617e27b6d02
local     59e773b558bc859c8819420ab883efbbd80ba0f820be1ef86ad10981f870f657
local     airflow-etl-pipeline-astro-postgres_54d206_airflow_logs
local     airflow-etl-pipeline-astro-postgres_54d206_postgres_data
local     airflow-practice_d1986e_airflow_logs
local     airflow-practice_d1986e_postgres_data
local     mariadb_data
(mlflow_env) toni-birat@tonibirat:/media/toni-birat/New Volume/ML_Flow_Complete/Airflow_ETL_Pipeline_Astro_Postgres$ docker volume rm airflow_etl_pipeline_astro_postgres_postgres_data
Error response from daemon: get airflow_etl_pipeline_astro_postgres_postgres_data: no such volume
(mlflow_env) toni-birat@tonibirat:/media/toni-birat/New Volume/ML_Flow_Complete/Airflow_ETL_Pipeline_Astro_Postgres$ docker volume rm airflow_etl_pipeline_astro_postgres_postgres_data
Error response from daemon: get airflow_etl_pipeline_astro_postgres_postgres_data: no such volume
(mlflow_env) toni-birat@tonibirat:/media/toni-birat/New Volume/ML_Flow_Complete/Airflow_ETL_Pipeline_Astro_Postgres$ docker volume rm airflow-etl-pipeline-astro-postgres_54d206_airflow_logs
airflow-etl-pipeline-astro-postgres_54d206_airflow_logs
(mlflow_env) toni-birat@tonibirat:/media/toni-birat/New Volume/ML_Flow_Complete/Airflow_ETL_Pipeline_Astro_Postgres$ docker volume rm airflow-etl-pipeline-astro-postgres_54d206_postgres_data
airflow-etl-pipeline-astro-postgres_54d206_postgres_data
(mlflow_env) toni-birat@tonibirat:/media/toni-birat/New Volume/ML_Flow_Complete/Airflow_ETL_Pipeline_Astro_Postgres$ docker volume rm airflow-practice_d1986e_postgres_data
airflow-practice_d1986e_postgres_data
(mlflow_env) toni-birat@tonibirat:/media/toni-birat/New Volume/ML_Flow_Complete/Airflow_ETL_Pipeline_Astro_Postgres$

```


Once all the DAGs are completed we've successfully implement or automated the complete `ETL` pipeline project.

As we've `Postgres` container we can't directly viewe the tables and our rows. For that we will need to install `dbeaver community`

<img src='./Notes_Img/a4.png'>

**DBeaver**

<img src='./Notes_Img/a5.png'>

Go to the Database, look for `apod_data`

<img src='./Notes_Img/a6.png'>


### **Advantage of Providing the Connection Variables from Airflow UI**

We can pass any creds, remote host id, api keys.


## **Deploying the Astro Project in the Astro Cloud and AWS**

[Video_Link](https://www.udemy.com/course/complete-mlops-bootcamp-with-10-end-to-end-ml-projects/learn/lecture/46199315#overview)

**Astro.io**

We will host the airflow application in the Astro Cloud. Login, Create Account.

Create the organization. Name the project. 

**AWS Database**

Create a posgres

<hr>

We will use `Astro CLI` for the deployment.

`astro login`

`astro deploy`


## **Tomorrow**

Fix the DAG problem. Retry with
