# Deploy Versioned Airflow DAG Script Artifact

A lot of serverless AWS Service supports versioning and alias for deployment. It made the blue / green deployment, canary deployment, and rolling back super easy.

- [AWS Lambda Versioning and Alias](https://docs.aws.amazon.com/lambda/latest/dg/configuration-versions.html)
- [AWS StepFunction Versioning and Alias](https://docs.aws.amazon.com/step-functions/latest/dg/auth-version-alias.html)
- [AWS SageMaker Model Registry Versioning](https://docs.aws.amazon.com/sagemaker/latest/dg/model-registry.html)

However, Airflow DAG does not support this feature yet. This library provides a way to manage Airflow DAG versioning and alias so you can deploy Airflow DAG with confidence.


## Quick Start

First, import the ``AirflowDagArtifact`` from ``airflow_dag_artifact.api``. The ``AirflowDagArtifact`` is an abstraction of an Airflow DAG script. Also, we need to import the ``BotoSesManager`` object to give our artifact manager AWS permission. In this example, you need AWS S3 and AWS DynamoDB permission.

In [1]:
from airflow_dag_artifact.api import AirflowDagArtifact
from boto_session_manager import BotoSesManager

We need to import additional library to improve our development experience

In [2]:
# define the Path to the artifact files
from pathlib import Path
# pretty printer for debugging
from rich import print as rprint

First, let's use a local AWS CLI profile to create the boto session manager object.

In [3]:
bsm = BotoSesManager(profile_name="bmt_app_dev_us_east_1")

### Create Airflow DAG Script Artifact

This code block will create the Airflow DAG script artifact. Firstly, let's create the path to the script and display the content.

In [4]:
dir_here = Path.cwd().absolute()
dir_project_root = dir_here.parent
path_airflow_dag_script_1_py = dir_here.joinpath("airflow_dag_script_1.py")
print(path_airflow_dag_script_1_py.read_text())

import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator

with DAG(
    dag_id="my_dag_name",
    start_date=datetime.datetime(2021, 1, 1),
    schedule="@daily",
):
    EmptyOperator(task_id="task")


Then we create an Airflow DAG script artifact object. We need to specify ``aws_region``, ``s3_bucket``, ``s3_prefix`` and ``dynamodb_table_name`` to define the artifact store backend. It uses the [versioned](https://github.com/MacHu-GWU/versioned-project) Python library under the hood to manage the artifact content and its metadata. Also, you have to give it a unique ``artifact_name``, it will become part of the naming convention of artifact S3 location. And we pass the ``path_airflow_dag_script`` to define where is the Airflow DAG script located.

In [5]:
aws_region = bsm.aws_region
s3_bucket = f"{bsm.aws_account_id}-{bsm.aws_region}-artifacts"
s3_prefix = "versioned-artifacts"
dynamodb_table_name = "versioned-artifacts"

airflow_dag_script_artifact = AirflowDagArtifact(
    aws_region=aws_region,
    s3_bucket=s3_bucket,
    s3_prefix=s3_prefix,
    dynamodb_table_name=dynamodb_table_name,
    artifact_name="airflow_dag_script_1",
    path_airflow_dag_script=path_airflow_dag_script_1_py,
)
print(airflow_dag_script_artifact.path_airflow_dag_script.relative_to(dir_project_root.parent))

airflow_dag_artifact-project/examples/airflow_dag_script_1.py


``airflow_dag_artifact`` uses AWS S3 to store the artifact files and AWS DynamoDB to store the artifact metadata. Yet, the S3 bucket and DynamoDB table are not created yet, so we have to call the ``.bootstrap`` method to create them.

In [6]:
airflow_dag_script_artifact.repo.purge_all()
airflow_dag_script_artifact.bootstrap(bsm=bsm)

Now we can just call the ``put_artifact`` method to deploy the artifact as the ``LATEST``. It will return an ``Artifact`` object includes the metadata of the artifact.

In [7]:
artifact = airflow_dag_script_artifact.put_artifact(metadata={"foo": "bar"})
rprint(artifact)

If you want to deploy your Airflow dag via CLI, terraform, or any other tool, you can retrieve the versioned dag artifact from S3. You can use the ``get_artifact_s3path()`` method to get the latest artifact S3 uri.

In [8]:
s3path = airflow_dag_script_artifact.get_artifact_s3path()
print(s3path.uri)
rprint(s3path.console_url)

s3://111122223333-us-east-1-artifacts/versioned-artifacts/airflow_dag_script_1/LATEST.py


Once you made a release to production, you should create an immutable version of your artifact so you can roll back anytime. You can use ``publish_artifact_version()`` method to publish a new version from the Latest. The version is simply a immutable snapshot of your latest artifact.

In [9]:
artifact = airflow_dag_script_artifact.publish_artifact_version()
rprint(artifact)

When you are doing roll back, you need to pass the S3 uri of the historical version of artifact. You can use the ``get_artifact_s3path(version=...)`` method to get the S3 uri.

In [10]:
s3path = airflow_dag_script_artifact.get_artifact_s3path(version=1)
print(s3path.uri)

s3://111122223333-us-east-1-artifacts/versioned-artifacts/airflow_dag_script_1/000001.py


## Summary

Now you get the idea of how to manage Airflow DAG artifacts using ``airflow_dag_artifact`` Python library. With versioned artifacts, you can easily enable the blue/green, canary deployment, and have the confidence to roll back when there's a failure in production. I highly suggest this pattern in production project.
