# Apache Airflow

Apache Airflow is scalable, dynamic, extensible, and lean 

The five main features of Apache Airflow are pure Python, useful UI, integration, easy to use, and open-source 

A common use case is that Apache Airflow defines and organizes machine learning pipeline dependencies 

Tasks are created with Airflow operators 

Pipelines are specified as dependencies between tasks 

Pipeline DAGs defined as code are more maintainable, testable, and collaborative 

Apache Airflow has a rich UI that simplifies working with data pipelines 

You can visualize your DAG in graph or grid mode 

Key components of a DAG definition file include DAG arguments, DAG and task definitions, and the task pipeline 

Set a schedule to specify how often to re-run your DAG

You can save Airflow logs into local file systems and send them to cloud storage, search engines, and log analyzers 

Airflow recommends sending production deployment logs to be analyzed by Elasticsearch or Splunk 

You can view DAGs and task events with Airflow’s UI

The three types of Airflow metrics are counters, gauges, and timers 

Airflow recommends that production deployment metrics be sent to and analyzed by Prometheus via StatsD

**Batch loading** refers to loading data in chunks defined by some time windows of data accumulated by the data source, usually on the order of hours to days.

An ETL job can be scheduled to run by creating **a cron job** for your Bash script.

ETL is curating data from multiple sources, conforming it to a unified data format or structure, and loading the transformed data to its new environment.

You can use BashOperator to invoke bash commands or scripts.

Counters are metrics that will always be increasing, such as the total counts of successful or failed tasks.

In [None]:
airflow dags list

In [None]:
airflow dags list|grep "my-first-python-etl-dag"

In [None]:
airflow tasks list my-first-python-etl-dag

In [None]:
airflow tasks list example_bash_operator

In [None]:
airflow tasks list example_branch_labels

In [None]:
airflow dags unpause tutorial

In [None]:
airflow dags pause tutorial

## Simple DAG

Copy DAG file as my_dag.py to __airflow/dags__ folder

In [None]:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

# Default arguments for the DAG
default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

# Define the DAG
dag = DAG(
    "simple_dag",
    default_args=default_args,
    description="A simple DAG",
    schedule_interval=timedelta(days=1),
    start_date=datetime(2023, 7, 1),
    catchup=False,
)

# Define tasks
t1 = BashOperator(
    task_id="print_date",
    bash_command="date",
    dag=dag,
)

t2 = BashOperator(
    task_id="print_hello",
    bash_command='echo "Hello World!"',
    dag=dag,
)

# Set task dependencies
t1 >> t2

### Sample DAG

In [None]:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

# Define default arguments for the DAG
default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2024, 7, 17),
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

# Define the DAG
dag = DAG(
    "simple_example_dag",
    default_args=default_args,
    description="A simple example DAG",
    schedule_interval=timedelta(days=1),
)


# Define tasks
def task_1():
    print("Executing Task 1")


def task_2():
    print("Executing Task 2")


def task_3():
    print("Executing Task 3")


# Create task instances
t1 = PythonOperator(
    task_id="task_1",
    python_callable=task_1,
    dag=dag,
)

t2 = PythonOperator(
    task_id="task_2",
    python_callable=task_2,
    dag=dag,
)

t3 = PythonOperator(
    task_id="task_3",
    python_callable=task_3,
    dag=dag,
)

# Set task dependencies
t1 >> [t2, t3]

### Sample DAG from Coursera

a DAG file, my_first_dag.py, which will run daily. Defines tasks execute_extract, execute_transform, execute_load, and execute_check to call the respective Python functions.

In [None]:
# Import the libraries
from datetime import timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow.models import DAG

# Operators; you need this to write tasks!
from airflow.operators.python import PythonOperator

# This makes scheduling easy
from airflow.utils.dates import days_ago

# Define the path for the input and output files
input_file = "/etc/passwd"
extracted_file = "extracted-data.txt"
transformed_file = "transformed.txt"
output_file = "data_for_analytics.csv"


def extract():
    global input_file
    print("Inside Extract")
    # Read the contents of the file into a string
    with open(input_file, "r") as infile, open(extracted_file, "w") as outfile:
        for line in infile:
            fields = line.split(":")
            if len(fields) >= 6:
                field_1 = fields[0]
                field_3 = fields[2]
                field_6 = fields[5]
                outfile.write(field_1 + ":" + field_3 + ":" + field_6 + "\n")


def transform():
    global extracted_file, transformed_file
    print("Inside Transform")
    with open(extracted_file, "r") as infile, open(transformed_file, "w") as outfile:
        for line in infile:
            processed_line = line.replace(":", ",")
            outfile.write(processed_line + "\n")


def load():
    global transformed_file, output_file
    print("Inside Load")
    # Save the array to a CSV file
    with open(transformed_file, "r") as infile, open(output_file, "w") as outfile:
        for line in infile:
            outfile.write(line + "\n")


def check():
    global output_file
    print("Inside Check")
    # Save the array to a CSV file
    with open(output_file, "r") as infile:
        for line in infile:
            print(line)


# You can override them on a per-task basis during operator initialization
default_args = {
    "owner": "Your name",
    "start_date": days_ago(0),
    "email": ["your email"],
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

# Define the DAG
dag = DAG(
    "my-first-python-etl-dag",
    default_args=default_args,
    description="My first DAG",
    schedule_interval=timedelta(days=1),
)

# Define the task named execute_extract to call the `extract` function
execute_extract = PythonOperator(
    task_id="extract",
    python_callable=extract,
    dag=dag,
)

# Define the task named execute_transform to call the `transform` function
execute_transform = PythonOperator(
    task_id="transform",
    python_callable=transform,
    dag=dag,
)

# Define the task named execute_load to call the `load` function
execute_load = PythonOperator(
    task_id="load",
    python_callable=load,
    dag=dag,
)

# Define the task named execute_load to call the `load` function
execute_check = PythonOperator(
    task_id="check",
    python_callable=check,
    dag=dag,
)

# Task pipeline
execute_extract >> execute_transform >> execute_load >> execute_check

### DAG: ETL Server Access Log

In [None]:
# Import the libraries
from datetime import timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow.models import DAG

# Operators; you need this to write tasks!
from airflow.operators.python import PythonOperator
from airflow.operators.bash_operator import BashOperator

# This makes scheduling easy
from airflow.utils.dates import days_ago
import requests

# Define the path for the input and output files
input_file = "web-server-access-log.txt"
extracted_file = "extracted-data.txt"
transformed_file = "transformed.txt"
output_file = "capitalized.txt"


def download_file():
    url = "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Apache%20Airflow/Build%20a%20DAG%20using%20Airflow/web-server-access-log.txt"
    # Send a GET request to the URL
    with requests.get(url, stream=True) as response:
        # Raise an exception for HTTP errors
        response.raise_for_status()
        # Open a local file in binary write mode
        with open(input_file, "wb") as file:
            # Write the content to the local file in chunks
            for chunk in response.iter_content(chunk_size=8192):
                file.write(chunk)
    print(f"File downloaded successfully: {input_file}")


def extract():
    global input_file
    print("Inside Extract")
    # Read the contents of the file into a string
    with open(input_file, "r") as infile, open(extracted_file, "w") as outfile:
        for line in infile:
            fields = line.split("#")
            if len(fields) >= 4:
                field_1 = fields[0]
                field_4 = fields[3]
                outfile.write(field_1 + "#" + field_4 + "\n")


def transform():
    global extracted_file, transformed_file
    print("Inside Transform")
    with open(extracted_file, "r") as infile, open(transformed_file, "w") as outfile:
        for line in infile:
            processed_line = line.upper()
            outfile.write(processed_line + "\n")


def load():
    global transformed_file, output_file
    print("Inside Load")
    # Save the array to a CSV file
    with open(transformed_file, "r") as infile, open(output_file, "w") as outfile:
        for line in infile:
            outfile.write(line + "\n")


def check():
    global output_file
    print("Inside Check")
    # Save the array to a CSV file
    with open(output_file, "r") as infile:
        for line in infile:
            print(line)


# You can override them on a per-task basis during operator initialization
default_args = {
    "owner": "Your name",
    "start_date": days_ago(0),
    "email": ["your email"],
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

# Define the DAG
dag = DAG(
    "my-first-python-etl-dag",
    default_args=default_args,
    description="My first DAG",
    schedule_interval=timedelta(days=1),
)

# Define the task named download to call the `download_file` function
download = PythonOperator(
    task_id="download",
    python_callable=download_file,
    dag=dag,
)

# Define the task named execute_extract to call the `extract` function
execute_extract = PythonOperator(
    task_id="extract",
    python_callable=extract,
    dag=dag,
)

# Define the task named execute_transform to call the `transform` function
execute_transform = PythonOperator(
    task_id="transform",
    python_callable=transform,
    dag=dag,
)

# Define the task named execute_load to call the `load` function
execute_load = PythonOperator(
    task_id="load",
    python_callable=load,
    dag=dag,
)

# Define the task named execute_load to call the `load` function
execute_check = PythonOperator(
    task_id="check",
    python_callable=check,
    dag=dag,
)

# Task pipeline
download >> execute_extract >> execute_transform >> execute_load >> execute_check

### Dummy Dag

Open Auto-refresh in Airflow WEB UI, after adding the .py file under /dags folder

In [None]:
# import the libraries

from datetime import timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to write tasks!
from airflow.operators.bash_operator import BashOperator

# This makes scheduling easy
from airflow.utils.dates import days_ago

# defining DAG arguments

# You can override them on a per-task basis during operator initialization
default_args = {
    "owner": "Your name",
    "start_date": days_ago(0),
    "email": ["your email"],
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

# defining the DAG
dag = DAG(
    "dummy_dag",
    default_args=default_args,
    description="My first DAG",
    schedule_interval=timedelta(minutes=1),
)

# define the tasks

# define the first task

task1 = BashOperator(
    task_id="task1",
    bash_command="sleep 1",
    dag=dag,
)

# define the second task
task2 = BashOperator(
    task_id="task2",
    bash_command="sleep 2",
    dag=dag,
)

# define the third task
task3 = BashOperator(
    task_id="task3",
    bash_command="sleep 3",
    dag=dag,
)

# task pipeline
task1 >> task2 >> task3