Install and set up Airflow: The first step is to install Apache Airflow on your local machine or a remote server. This can be done by running the command pip install apache-airflow. Once Airflow is installed, you'll need to initialize the database and start the web server. This can be done by running the command airflow initdb and airflow webserver.

In [1]:
!pip install apache-airflow


Defaulting to user installation because normal site-packages is not writeable
Collecting apache-airflow
  Downloading apache_airflow-2.5.0-py3-none-any.whl (6.6 MB)
Collecting mdit-py-plugins>=0.3.0
  Downloading mdit_py_plugins-0.3.3-py3-none-any.whl (50 kB)
Collecting colorlog<5.0,>=4.0.2
  Downloading colorlog-4.8.0-py2.py3-none-any.whl (10 kB)
Collecting connexion[flask,swagger-ui]>=2.10.0
  Downloading connexion-2.14.1-py2.py3-none-any.whl (95 kB)
Collecting alembic<2.0,>=1.6.3

ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
anaconda-project 0.10.2 requires ruamel-yaml, which is not installed.



  Downloading alembic-1.9.2-py3-none-any.whl (210 kB)
Collecting unicodecsv>=0.14.1
  Downloading unicodecsv-0.14.1.tar.gz (10 kB)
Collecting gunicorn>=20.1.0
  Downloading gunicorn-20.1.0-py3-none-any.whl (79 kB)
Collecting apache-airflow-providers-imap
  Downloading apache_airflow_providers_imap-3.1.1-py3-none-any.whl (17 kB)
Collecting pathspec~=0.9.0
  Downloading pathspec-0.9.0-py2.py3-none-any.whl (31 kB)
Collecting jinja2>=3.0.0
  Downloading Jinja2-3.1.2-py3-none-any.whl (133 kB)
Collecting argcomplete>=1.10
  Downloading argcomplete-2.0.0-py2.py3-none-any.whl (37 kB)
Collecting flask<2.3,>=2.2
  Downloading Flask-2.2.2-py3-none-any.whl (101 kB)
Collecting cattrs>=22.1.0
  Downloading cattrs-22.2.0-py3-none-any.whl (35 kB)
Collecting blinker
  Downloading blinker-1.5-py2.py3-none-any.whl (12 kB)
Collecting configupdater>=3.1.1
  Downloading ConfigUpdater-3.1.1-py2.py3-none-any.whl (34 kB)
Collecting cron-descriptor>=1.2.24
  Downloading cron_descriptor-1.2.32.tar.gz (28 kB)
Co

You'll also need to initialize the Airflow database by running:

In [2]:
!airflow initdb


'airflow' is not recognized as an internal or external command,
operable program or batch file.


And start the airflow web server and scheduler:

In [None]:
airflow webserver -p 8080
airflow scheduler


Create an ETL DAG: Once Airflow is set up, you'll need to create a DAG (Directed Acyclic Graph) to define your ETL workflow. A DAG is a collection of tasks that run to completion. For example, here's a simple DAG that reads data from a CSV file, processes it, and writes it to a database:

In [None]:
# #prev
# from airflow import DAG
# from datetime import datetime, timedelta

# dag = DAG(
#     'my_dag',
#     start_date=datetime(2022, 1, 1),
#     schedule_interval='0 0 * * *',
#     catchup=False
# )


In [None]:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
from sqlalchemy import create_engine

# Define default_args dict to pass to the DAG
default_args = {
    'owner': 'me',
    'start_date': datetime(2020, 1, 1),
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Create a DAG instance
dag = DAG(
    'etl_dag', default_args=default_args, schedule_interval=timedelta(hours=1)
)

# Define a function to extract data
def extract():
    data = pd.read_csv('data.csv')
    return data

# Define a function to transform data
def transform(**kwargs):
    data = kwargs['data']
    data.fillna(data.mean(), inplace=True)
    return data

# Define a function to load data
def load(**kwargs):
    data = kwargs['data']
    engine = create_engine('sqlite:///data.db')
    data.to_sql('data', engine)

# Create extract operator
extract_task = PythonOperator(
    task_id='extract_task',
    python_callable=extract,
    dag=dag,
)

# Create transform operator
transform_task = PythonOperator(
    task_id='transform_task',
    python_callable=transform,
    provide_context=True,
    dag=dag,
)

# Create load operator
load_task = PythonOperator(
    task_id='load_task',
    python_callable=load,
    provide_context=True,
    dag=dag,
)

# Set task dependencies
extract_task >> transform_task >> load_task


Define tasks: Once the DAG is created, you can define tasks that will be executed as part of the DAG. Airflow provides a wide range of built-in operators for common tasks such as reading from and writing to files, running SQL queries, and more. For example, to read data from a CSV file and load it into a database, you can use the CSVToMySqlOperator and MySqlOperator operators:

In [None]:
from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators.python_operator import PythonOperator

def read_csv():
    data = pd.read_csv('data.csv')
    return data

read_csv_task = PythonOperator(
    task_id='read_csv',
    python_callable=read_csv,
    dag=dag
)

load_data_task = MySqlOperator(
    task_id='load_data',
    sql='INSERT INTO data (col1, col2, col3) VALUES (:col1, :col2, :col3)',
    mysql_conn_id='mysql_conn_id',
    parameters={'col1': 'val1', 'col2': 'val2', 'col3': 'val3'},
    dag=dag
)


Monitor the DAG: You can monitor the progress of the DAG and view the status of each task using the Airflow web UI. You can also set up email alerts for task failures, and use the Airflow CLI to check the status of the DAG or individual tasks.

Schedule the DAG: Once you've tested the DAG and are satisfied with its performance, you can schedule it to run automatically at regular intervals using the schedule_interval parameter in the DAG definition.

Data Validation: Once the data is loaded, it's important to validate the data to ensure that it's correct, complete and ready for analysis. This could include writing unit tests, running data quality checks, and monitoring data for any errors.

Data Versioning: Data versioning is also important as it allows you to keep track of different versions of the data, and roll back to a previous version if needed. This is especially important if you want to reproduce a previous analysis or if you want to make sure you can always go back to the original data.

Data Backup: It's also important to backup your data regularly, in case of any data loss or corruption. This could include backing up data to a remote server or cloud storage service.