# Hands-On Exercise: Apache Airflow for Data Pipelines Automation and Orchestration

**Objective**: Students will learn how to use Apache Airflow to automate and orchestrate data pipelines. The hands-on exercise will cover workflow automation, job scheduling, monitoring, and integrating Airflow with the previously created Hadoop cluster components like HDFS, Hive, and Spark.


**Introduction to Apache Airflow**

Apache Airflow is an open-source platform for authoring, scheduling, and monitoring workflows as Directed Acyclic Graphs (DAGs). It allows for the automation of ETL (Extract, Transform, Load) jobs and other complex workflows.

## Step 1: Job Scheduling and Monitoring

Airflow allows users to schedule tasks using DAGs, which are Python scripts defining the sequence of tasks.

### Task 1: Creating a Simple Airflow DAG

1. Create Your First DAG: Create a DAG file `simple_dag.py` in the Airflow DAGs directory (`~/airflow/dags/`):

In [None]:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

# Define DAG arguments
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
}

# Instantiate a DAG
dag = DAG(
    'simple_dag',
    default_args=default_args,
    schedule_interval='@daily'
)

# Define tasks
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag
)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    dag=dag
)

t3 = BashOperator(
    task_id='current_working_dir',
    bash_command='pwd',
    dag=dag
)

# Define task dependencies
t1 >> t2 >> t3


2. Monitor the DAG in Airflow UI: Once the DAG is created, navigate to the Airflow UI, where you will find the DAG listed under "DAGs". Trigger it manually and monitor its progress.

## Step 2: Building and Scheduling Data Workflows with Airflow

Now that you understand how to create simple DAGs, let's build a more complex workflow, integrating tasks such as data extraction, transformation, and loading.

### Task 2: Building a Data Workflow

1. Create a Data Pipeline DAG: In this example, you will create a DAG that simulates a data workflow: fetching data from an API, processing it, and loading it to HDFS. Copy the below into a file `hdfs_upload_dag.py`

but before, make sure `jq` command is installed with: `sudo apt  install jq`

In [None]:
import requests
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

# Define default arguments
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 11, 22),
    'retries': 1,
}

# Define the DAG
dag = DAG(
    'hdfs_data_workflow_dag',
    default_args=default_args,
    schedule_interval='@daily'
)

# Define Python function to fetch data from API
def fetch_data():
    response = requests.get("https://dummyjson.com/users")
    with open('/tmp/data.json', 'w') as f:
        f.write(response.text)

# Define tasks
fetch_task = PythonOperator(
    task_id='fetch_data_from_api',
    python_callable=fetch_data,
    dag=dag
)

process_task = BashOperator(
    task_id='process_data',
    bash_command='cat /tmp/data.json | jq . > /tmp/processed_data.json',
    dag=dag
)

create_dir_task = BashOperator(
    task_id='create_hdfs_dir',
    bash_command='hdfs dfs -mkdir -p /user/datatech-labs/airflow_processed_data',
    dag=dag
)

load_task = BashOperator(
    task_id='load_to_hdfs',
    bash_command='hdfs dfs -put /tmp/processed_data.json /user/datatech-labs/airflow_processed_data',
    dag=dag
)

# Set task dependencies
fetch_task >> process_task >> create_dir_task >> load_task


2. **Run the DAG**: Monitor the tasks through the Airflow UI as the data flows through fetching, processing, and loading stages.

3. **Check Logs**: In the Airflow UI, go to the "Graph View" or "Tree View" of a DAG and click on a task to access the logs. Logs contain detailed information about task execution, including errors.

## Step 3: Airflow Operators

Operators are the building blocks of DAGs. Airflow has multiple types of operators for different tasks.

### Task 3: Using Airflow Operators

1. **BashOperator**: This operator allows you to run bash commands.

In [None]:
from airflow.operators.bash_operator import BashOperator

bash_task = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag
)


2. **PythonOperator**: Use this operator to execute Python code.

In [None]:
from airflow.operators.python_operator import PythonOperator

def my_function():
    print("Hello World")

python_task = PythonOperator(
    task_id='run_python_function',
    python_callable=my_function,
    dag=dag
)


## Step 4: Airflow Sensors

Sensors are special types of operators that wait for a certain condition to be met before continuing execution.

### Task 4: Folder Listening with Airflow Sensor

1. HDFS Sensor: You can use the HdfsSensor to monitor HDFS for new files.

In [None]:
from airflow.providers.apache.hdfs.sensors.hdfs import HdfsSensor

hdfs_sensor = HdfsSensor(
    task_id='wait_for_hdfs_file',
    filepath='/user/hadoop/data_file.csv',
    hdfs_conn_id='hdfs_default',
    poke_interval=10,
    dag=dag
)


## Step 5: Integrating Airflow with Hadoop Cluster Components

Now, let’s integrate Airflow with components in your Hadoop cluster, such as HDFS, Hive, and Spark.

### Task 5: HDFS, Hive, and Spark Integration

1. HDFS Integration: Use `HdfsOperator` to move files between HDFS and local storage.

In [None]:
from airflow.providers.apache.hdfs.operators.hdfs import HdfsPutFileOperator

hdfs_put = HdfsPutFileOperator(
    task_id='put_file_to_hdfs',
    local_path='/tmp/processed_data.json',
    remote_path='/user/hadoop/processed_data',
    hdfs_conn_id='hdfs_default',
    dag=dag
)


2. Hive Integration: Use `HiveOperator` to run Hive queries.

In [None]:
from airflow.providers.apache.hive.operators.hive import HiveOperator

hive_task = HiveOperator(
    task_id='run_hive_query',
    hql='SELECT * FROM sales_data LIMIT 10;',
    hive_cli_conn_id='hive_conn',
    dag=dag
)


3. Spark Integration: Use `SparkSubmitOperator` to run Spark jobs from Airflow.

In [None]:
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

spark_task = SparkSubmitOperator(
    task_id='run_spark_job',
    application='/path/to/spark_job.py',
    conn_id='spark_default',
    dag=dag
)


## full Project: Implementing an ETL Pipeline

**Overview of the Pipeline**
- Extract: Use Airflow to extract data from an external API.
- Transform: Use Spark to process the extracted data.
- Load: Store the processed data in HDFS and load it into a Hive table.

**Prerequisites**

Before you begin, ensure that:

- Airflow is up and running.
- The Hadoop cluster (HDFS, Hive, and Spark) is set up and operational.
- Spark is installed and configured on the cluster.
- You have created a Hive database and a Hive table to store the processed data.


#### 1. Extract Data from an API

In this step, you will use Airflow’s `PythonOperator` to extract data from a public API and store it in a local file for further processing.

    **Airflow DAG for Extraction**:
- **Python function to extract data**: We will use the "OpenWeather" API to get weather data (you can choose any API). The data will be stored locally in JSON format.



In [None]:
import requests
import json
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

# fetches weather data from an external API and stores it as a JSON file in a specific directory.
def extract_data():
    url = "https://api.open-meteo.com/v1/forecast?latitude=35&longitude=139&hourly=temperature_2m"
    response = requests.get(url)
    data = response.json()
    with open('/tmp/weather_data.json', 'w') as f:
        json.dump(data, f)

# Define DAG default arguments
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 9, 1),
    'retries': 1,
}

# Instantiate the DAG
dag = DAG(
    'open_weather_etl_pipeline',
    default_args=default_args,
    schedule_interval=None #'@daily'
)

# Create the extraction task. PythonOperator calls this function as part of an Airflow DAG
extract_task = PythonOperator(
    task_id='extract_weather_data',
    python_callable=extract_data,
    dag=dag
)


#### 2. Transform the Data with Spark

In this step, you will use a `SparkSubmitOperator` to transform the extracted data. Spark will read the JSON file and perform transformations (e.g., filtering, aggregating) to prepare it for loading into Hive.

Spark Transformation Script (`transform_weather_data.py`):
Save this Python script on your Hadoop cluster where Spark is installed.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("WeatherDataTransformation") \
    .enableHiveSupport() \
    .getOrCreate()

# Load raw JSON data
df = spark.read.json("file:///tmp/weather_data.json")

# Perform transformations (e.g., filter rows, select columns)
df_filtered = df.select(
    explode(col("hourly.temperature_2m")).alias("temperature"),
    col("latitude"),
    col("longitude")
)

# Save the transformed data to HDFS in Parquet format
df_filtered.write.mode("append").parquet("/user/datatech-labs/processed_weather_data")

# Stop Spark session
spark.stop()


**Airflow DAG for Transformation:**

Now, add a new task in the Airflow DAG to submit this Spark job. And you also need to make sure to pip install Spark dependency:

In [None]:
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

# Define the Spark transformation task
# transform_task = SparkSubmitOperator(
#     task_id='transform_weather_data',
#     application='/path/to/transform_weather_data.py',  # Replace with actual path to the script
#     conn_id='spark_default',
#     dag=dag
# )

# Or with normal bash command if you have an issue with installing SparkSubmitOperator:
spark_file_path = '/home/oalsaghier/Documents/datatech_labs/datatech_lab_de_course_public/week5/pipelines_automation/airflow_dags_and_supported_files/pyspark_transform_weather_data.py'
transform_task = BashOperator(
    task_id='transform_weather_data',
    bash_command=f'spark-submit --master yarn --deploy-mode client {spark_file_path}',
    dag=dag
)

# Set the task dependencies: extract first, then transform
extract_task >> transform_task


#### 3. Load Data into HDFS and Hive

In this step, you will load the processed data from HDFS into a Hive table.

**Hive Table Creation**:

First, create a Hive table to store the data. Open the Hive CLI on your Hadoop cluster and run the following query:

In [None]:
CREATE DATABASE IF NOT EXISTS weather_db;

CREATE EXTERNAL TABLE IF NOT EXISTS weather_db.weather_data (
    temperature DOUBLE,
    latitude DOUBLE,
    longitude DOUBLE
)
STORED AS PARQUET
LOCATION '/user/datatech-labs/processed_weather_data'
;

**HOMEWORK: Airflow DAG for Loading Data into Hive:**

Now, add a new task to your DAG to load data into Hive by running a Hive query.

In [None]:
# In order to run this example, you need:
#  1- install sasl on your linux machine:
#   sudo apt-get install libsasl2-dev
#  2- pip install the Hive operator package inside venv:
#   pip install apache-airflow-providers-apache-hive
#  3- go the connection, and edit 'hive_cli_default' connection by:
#   setting '{"use_beeline": false}' in the 'Extra' field

# from airflow.providers.apache.hive.operators.hive import HiveOperator

# # Add Hive task to load data
# load_task = HiveOperator(
#     task_id='load_to_hive',
#     hql='SELECT * FROM weather_db.weather_data;',  # Repair the Hive table to reflect new data
#     hive_cli_conn_id='hive_conn',
#     dag=dag
# )

hql_query = 'select * from weather_db.weather_data'
load_task = BashOperator(
    task_id='read_from_hive',
    bash_command=f"hive -e '{hql_query}'",
    dag=dag
)


# Set task dependencies: transform first, then load
transform_task >> load_task


#### 4. Monitoring the Pipeline

**Airflow Monitoring**:

You can monitor the progress of the DAG and its tasks through the Airflow UI. Check the logs for each task to see detailed information, including any errors that might occur.

**Spark Monitoring**:

To monitor the Spark job, use the Spark UI at `http://<spark-cluster-ip>:8088`. Here, you can view the stages and tasks of the Spark job, monitor execution time, and troubleshoot performance bottlenecks.

#### Summary of the ETL Pipeline:

1. Extract: Fetch weather data from an API using Airflow’s PythonOperator and store it in a local JSON file.

2. Transform: Process the JSON data using Spark. The Spark job filters and processes the data, saving the result as a Parquet file in HDFS.

3. Load: The processed data is loaded into a Hive table, making it available for querying and analysis.

This full ETL pipeline is a practical demonstration of how Airflow can orchestrate complex data workflows, from data extraction to loading into a Hive data warehouse. By integrating Airflow with Spark and Hive, you achieve automated and scalable data processing.






