Once we set up aiflow virtual environment and entered the following according to your python version 

<code>pip install 'apache-airflow==2.10.4' \
 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.4/constraints-3.9.txt"</code>

set up home environment variable to current directory with 

<code> export AIRFLOW_HOME=~/airflow</code>

And we can initialise an SQLite database with a log folder and configuration files 

<code> airflow db init </code>

Airflow webserver 

<code>airflow webserver -p8080</code>

will open a link that you can select to access the webserver <code>http://0.0.0.0:8080/</code>

we can create the login details/account use the following template: 

<code>airflow users create --username admin --firstname firstname --lastname lastname --role Admin --email admin@domain.com</code>

(Pass:1234)

for additional support use:  <code>airflow users create --help</code>

to delete a user account <code>airflow users delete -u username </code>

Scheduler needs to be running to allow tasks to execute

in a new terminal run <code>airflow scheduler</code> to reset the scheduler 

To turn on DAG toggle the button, from the tree view we can see the dag has multiple tasks

![ExampleDag](Images/ExampleDag.png)

The toggle allows us to pause and unpause the Dag

DAG names have to be unique


![DagHistory](Images/DagHistory.png)

![DagHistory2](Images/DagHistory2.png)


![DAGStates](Images/DagStates.png)

Gant view can be used to detect whether a task has ran longer than expected 

![GantView](Images/GanttView.png)

When a task has failed you can click on the page and clear task 

![ClearTask](Images/ClearTask.png)

Note what you clear is the task that will be restarted: 
* Downstream (DAG tasks following the failed task including the task that failed)
* Upstream (tasks prior to the task that failed within the DAG)
* Past ( Looks at task triggered in the past)
* Future ( Looks at tasks scheduled in the future)
* Recursive (Included subDAGs)

We can insert Variables (key value pairs containing string that we can use)

![Variables](Images/AirflowVariables.png)


We can also use connections to establish configuratigons with external resources 

![Connections](Images/AirflowConnections.png)

May need to test this out to confirm we can connect via EXASOL

![MySQLForm](Images/MySQLForm.png)

When you clear a state of a task or DAG you restart it

DAG Dependencies 

![DagDepencencies](Images/DagDependencies.png)

* Dataset Dependency - these DAGs are triggered once there's a detection of a dataset being updated (can be 1 or more)
* Trigger Dependency -  
* Sensor Dependency - 


When setting up dependencies you can set up the DAGS so that the Upstream DAG triggers the Downstream DAG once it's over

![Upstreamanddownstrea](Images/UpstreamandDownstreamDAG.png)


DATASET DEPENDENCY EXAMPLE 


In this example we will have created a DAG that feeds 2 separate tables in S3
DAG 2 will be triggered once there is a detection of the dataset being updated 

DAG 1
<pre><code id="python_code">

from airflow.decorators import dag, task 
from airflow import Dataset 
from datetime import datetime 

data_a = Dataset("s3://bucket_a/data_a)
data_b = Dataset("s3://bucket_b/data_b)

#define the DAG object
@dag(start_date = datetime(2025,1,1),schedule = @daily,catchup = False)
def producer():

    #outlets decide where the destination 
    @task(outlets = [data_a])
    def update_a():
        print("Data A has been updated")

    @task(outlets=[data_b])
    def update_b():
        print("Data B has been updated")

    #Defines the dependency 
    update_a() >> update_b()

producer()
</code></pre>

In the same folder you can create a second file which represents our downstream DAG

DAG 2
Note we can also utilise connections established in the admin settings to update a dataset 

<pre><code id = "python_code">

from airflwo.decorateors import dag,task
from airflow import Dataset 
from datetime imoprt datetime 

data_a = Dataset("s3://bucket_a/data_a)
data_b = Dataset("s3://bucket_b/data_b)

@dag(start_date = datetime(2025,1,1),schedule = [data_a,data_b], catchup = False)
def consumer():

    @task
    def run():
        print("run")
        
    run()

consumer()

</code></pre>



TRIGGER OPERATOR  EXAMPLE 

We will create 2 DAGs 1 controller.py that will trigger the target.py DAG

<pre><code>

from airflwo.decorateors import dag,task
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from datetime imoprt datetime 

@dag(start_date=datetime(2025,1,1),schedule = '@None',catchup= False)
def controller():

    @task
    def start():
        print("start")

    trigger = TriggerDagRunOperator(
        task_id='trigger_target_dag',         
        trigger_dag_id = 'target',              # The DAG ID of the DAG you want to trigger
        conf = {"message":"my_data"},           # configuration parameter with data you want to pass to trigger DAG
        wait_for_completion = True              # will wait till the end of completion of this task before the trigger is set 
    )

    @task
    def done():
        print("done)

</code></pre>

Trigger.py will take a shape similar to the following : 

<pre><code>

from airflow.decoraters import dag,task
from datetime import datetime 

@dag(start_date=datetime(2025,1,1),schedule='@None' ,catchup = False)
def target():

    @task
    def message(dag_run = None):
        print(dag_run.conf.get("message"))

    message()

target()

</code></pre>

With a schedule set to None this DAG won't run periodically and is only triggered by the first controller DAG


EXTERNAL TASK SENSOR EXAMPLE

You can use External task sensors to ensure a multitude of tasks are completed before triggering specific tasks in a DAG 
in this example waiting_for_a_b_c.py is a DAG that will track the completion of dag_a.py , dag_b.py and dag_c.py 

Picture 3 dags defined as the following : 

dag_a
<code><pre>

from airflow.decorators import dag, task
from datetime import datetime

@dag(start_date = datetime(2025,1,1),schedule = '@daily',catchup= False)
def dag_a():

    @task
    def task_a():
        print("task_a")

    task_a()

dag_a()

</code></pre>


dag_b
<code><pre>

from airflow.decorators import dag, task
from datetime import datetime

@dag(start_date = datetime(2025,1,1),schedule = '@daily',catchup= False)
def dag_b():

    @task
    def task_b():
        print("task_b")

    task_b()

dag_b()

</code></pre>

dag_c
<code><pre>

from airflow.decorators import dag, task
from datetime import datetime

@dag(start_date = datetime(2025,1,1),schedule = '@daily',catchup= False)
def dag_c():

    @task
    def task_c():
        print("task_c")

    task_c()

dag_c()

</code></pre>

You can create a DAG that waits on the external tasks before proceeding a following task

waiting_for_a_b_c.py
<pre><code>

from airflow.decorateros import dag,task
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime 

@dag(start_date=datetime(2025,1,1),schedule='@None',catchup = False)
def waiting_for_a_b_c():

    waiting_for_a = ExternalTaskSensor(
        task_id = 'waiting_for_a',
        external_dag_id = 'dag_a',
        external_task_id = 'task_a'
    )

    waiting_for_b = ExternalTaskSensor(
        task_id = 'waiting_for_b',
        external_dag_id = 'dag_b',
        external_task_id = 'task_b'
    )

    waiting_for_c = ExternalTaskSensor(
        task_id = 'waiting_for_c',
        external_dag_id = 'dag_c',
        external_task_id = 'task_c'
    )

    @task
    def next_a():
        print("a")

    @task
    def next_b():
        print("b")

    @task
    def next_c():
        print("c")

    waiting_for_a >> next_a >> done()
    waiting_for_b >> next_b >> done()
    waiting_for_c >> next_c >> done()

waiting_for_a_b_c()

</code></pre>

Note: External task sensors have the following configuraitons : 
* timeout - can help set a timeframe for the sensor to fail 
* poke_interval - which allows you to set a duration of time to check the external dag stage 
* mode = "reschedule" - this will mean the sensor DAG will be rescheduled at the next poke interval (if a condiction isn't met) instead of running for a long duration

Alternatively you can build DAG to ahve the following dependency 

<code>wait_task >> [success_task , fail_task] </code>

where the wait_task is our sensor success_task has trigger rule all_success and is followed when sensors succeeds and fail_task with all_failed trigger rules handles what happened when sensor returns false or timeouts 

------------------------------

Airflow in Docker

(Dependencies: Docker, Docker compose)

Docker - open source platform allows developers to build,test,deploy appliations quickly. Using containsers to package software into standardized units that can run.

Docker Compose - is a tool that helps you define and share multi-container applications. 
With Compse, you can create a YAML file to define the service and with a single command, you can spin everything up or tear it all down 

The big advantage of using Compose is you can define your application stack in a file, keep it at the root of your project repository,
and easilyt enable someone else ot contribute to your project. (All someone needs to do is clone the repository and star the app using Compose)


Fetch docker Compose yaml file: (Guidelines used for inspiration)

<code>curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.10.4/docker-compose.yaml'</code>

From within the YAML file: 
1. Update the AIRFLOW_CORE_EXECUTOR From CeleryExecutor to LocalExecutor
2. Delete the AIRFLOW__CELERY__RESULT_BACKEND and AIRFLOW__CELERY__BROKER_URL variables from the environment
3. remove the redis key under the dependency for airflow-common-depends-on
4. remove the entire redis definition from the services section
5. remove the airflow-worker section referncing the "celery worker" command
6. remove the flower section from the services 


Folders will be required for DAGs, logs and Plugins 

from within the folder of interest run 
<code>mkdir -p ./dags ./logs ./plugins ./config</code>

To download all the necessary docker images and initialise the database :
(Ensure Docker is running when you enter this command)
(Check the Docker >> Resources >>> max Memory settings to ensure there's 4GB> (7GB~) allocated)

<code>docker-compose up airflow-init</code>

Database initialisation is complete. 

Allows us to run containers in background ( in detached mode)

<code>docker-compose up -d</code>



This shows us we have an Airflow container, An airflow scheduler and a postgres database : 

![](Images/DockerContainers.png)

With these running we can check the Airflow Webserver via localhost:8080 port 

We can also view which containers are running in the airflow project : 

![](Images/ContainersLive.png)

NOTE: (Check the YAML file to detect the URL required for the Airflow UI and the credentials to the admin account)





------------------------

WAtch core concepts + task lifecycle + basic architecture 

-----------------

Airflow DAG with Python operator 


--------


Data sharing with Xcoms 


----------


Airflow task APIs

------

Airflow Catchup and Backfill 

----

Can we create a CI/CD pipelines ?




