>> Tutorial: https://www.youtube.com/watch?v=K9AnJ9_ZAXE&t=5484s

## Operators

* There are multiple different operators in Airflow.
* Operaror defins the work that the task needes to do.
* There are many operators, some of them are:
    * PythonOperator: Use to run python task.
    * BashOperator: These are used to perform simple operations like running a Python function, executing a Bash script, or interacting with the system.
    * DummyOperator: A placeholder operator that does nothing.
    * CustomOperator: Can create a custom operator as well.

## Task Lifecycle

![alt text](Images/task_lifecycle.png)

## Baisc Architecture

![alt text](Images/basic_architecture.png)


* `Dag`: A DAG is the core concept in Airflow, representing a collection of tasks with a defined execution order. DAGs define the workflows but do not execute them.
* `Scheduler`: The scheduler is responsible for reading DAGs, determining their execution schedules, and deciding when to run tasks. It monitors time intervals, external triggers, and DAG dependencies.
* `Executors`: The executor is responsible for executing the tasks that the scheduler has scheduled.
* `Workers`: Workers are responsible for executing the tasks assigned to them by the scheduler via the executor.
* `Web Server`: The web server provides a user interface to monitor, manage, and trigger workflows.
* `Metadata Database`: The metadata database stores all information about DAGs, tasks, users, and their states.
* `Logs`: Logs are an essential part of monitoring task execution.
* `Airflow.cfg`: The airflow.cfg file is the main configuration file for Apache Airflow. It contains settings that define how Airflow operates, including its database connection, executors, logging, scheduling, and security options. Each section in the configuration file corresponds to a specific part of the Airflow ecosystem, and tweaking these settings allows you to fine-tune how Airflow runs in your environment.

## Airflow Xcom

* you can push or pull data to or from airflow using xcom
* maximum allowed data sharing using Xcom is `48KB`.
* Never use Xcom to share large data. i.e. DataFrame.

## Airflow Decorators

* Airflow decorators are a more Pythonic and concise way to define DAGs, tasks, and other configurations in Apache Airflow. 
* Instead of explicitly creating and configuring DAGs and operators using traditional code, decorators allow you to annotate Python functions to automatically handle this.
* Introduced in Airflow 2.0, these decorators simplify writing DAGs by allowing you to define DAGs and tasks more declaratively, improving code readability and organization.
* Common Airflow Decorators:
    * @dag: Used to define a DAG directly from a Python function.
    * @task: Used to define individual tasks (like PythonOperator) as functions.
    * @task_group: Used to define a group of tasks.

## Dag Catchup and BackFill

* Airflow Provides the option to Catchup the dag runs from the start date.
    * assume, you created dag on 2024-10-02, but you want the data from 2024-09-28, in this case you can set the `catchup = True` in DAG defination.
* Airflow also provides the option to Backfill missig days data using the terminal.
    * for this you need to get the container list using command `docker ps`.
    * get the airflow-scheduler container_id.
    * loggin into the container using command `docker exec -it <CONTAINER_ID> bash`.
    * once you logged in as a user, then hit command: `airflow dags backfill -s <Date from when you want bacfill> -e <Date till when you want backfill> <DAG_ID>`.

## Schedule_Interval

* YOu can schedule the dag using `schedule_interval`.
* There are 2 ways of Scheduling dag:
    * datetime.timedelta
    * cron Expression:
        * Its a string comprise of five fields seperated by white space that represent the set of time.
        * Eg: 15 14 1 * * (`minute`     `hour`      `day_of_month`      `month`     `day_of_week`)
        * Airflow provides some presets:
        * ![alt text](Images/schedule_interval_preset.png)


## Airflow Connections

* YOu need to connnect to the external resoueces while creating DAgs.
* Airflow provides option to connect to different external resources such as (Databases, Cloud Servers, Others)
* To use the prostre DB connector, update docer-compose.yaml file with `ports: 5432:5432`.
* Recreate Postgre container using command: 
    * `docker-compose up -d --no-deps --build postgres` 
    * OR 
    * `docker-compose up -d --no-deps --build postgres airflow_docker_postgres_1  is up_to_date`
* Download Dbeaver Application and create a new connection within it with port 5432.
* Once done, add the connection to the airflow by going through the `Admin` > `Connection`
* Now you can insert or delete new data to the database using airflow.

## Airflow Docker Install Python Package

* There are 2 ways to install python dependencies to your airflow docker container.
    * Image Extending
    * Image Customising

![alt text](Images/install_python_package_in_container.png)

### Extending a Docker Image

* TO install new libraries in the existing docker image, use the below steps:
    * create requirements.txt file and provide all the libraries you want to install.
    * create a file named `Dockerfile` and write the command:

                FROM apache/airflow:2.5.1           # Define the image you want to extend
                COPY requirements.txt /requirements.txt # Copy the created requiremtn file to docker image
                RUN pip install --user --upgrade pip    # Upgrade pip 
                RUN pip install --no-cache-dir --user -r /requirements.txt  # Install libraries form requirements file.

    * ONce file is created, hit command: `docker build . --tag extending_airflow:latest`
    * ONce all the packages are installed, get into the `docker-compose.yaml` file, and change the image name from
        * `image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.5.1}`
        * to
        * `image: ${AIRFLOW_IMAGE_NAME:-extending_airflow:latest}`
        >> NOte: The changed image name should be the same as tag name, while building docker.
    * Test out the installed packages.

* YOu need to rebuild the airflow webserver and scheduler services as we modified the airflow image name in the docker-compose.yml file.
* To do so, hit below commands:
    * `docker-compose up -d --no-deps --build airflow-webserver airflow-scheduler`

### Customize Docker Image

* Clone the airflow git repository: https://github.com/apache/airflow.git
* Look for `docker-context-files`
* create new requirements.txt file within it.
* Build Docker image from scrath by using command: `docker build . --build-arg AIRFLOW_VERSION='2.0.1' --tag customising_airflow:latest`
* ONce Build is completed, get into the docker-compose.yml file and update the image name to 
    * `image: ${AIRFLOW_IMAGE_NAME:-customising_airflow:latest}`
>> NOTE: the tag name and the image name should be the same

### WHen to Extend and Customie Docker Image

| Image Extending | Image Customising |
|---|---|
| Can go with it 99% of the time. | If you want more things to customise|
| Easy to Use | Care about Image size Optimization |
| Build Fast ||

## Airflow AWS S3 SEnsor

* A Sensor is the special type of operator which waits for something to occur.
* Use case: When you dont know when the dag should be triggered.
* To test S3 sensor we are going to use `MinIO`.
* MinIO is an open-source, high-performance object storage system, that resembels AWS S3.
* Use command to run MinIO container: `docker run -p 9000:9000 -p 9001:9001 -e "MINIO_ROOT_USER=AKIAIOSFODNN7EXAMPLE" -e "MINIO_ROOT_PASSWORD=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" quay.io/minio/minio server /data --console-address ":9001"`
* Get into the webUP and login using the userid password provided in above command.
* Now look for the `amazon-provider` version by following the below steps:
    * Get list of airflow services running: `docker ps`
    * get amazon provider version: `docker exec -it <container_id of airflow_scheduler> bash`
* Get into the airflow documentation, select the particular version form top right corner, click on python API option and look for `aws.sensors.s3`
* you will get the exact library to be imported.
* Create an S3 Connection in Airflow:
    * Access the Airflow web UI.
    * Navigate to Admin -> Connections.
    * Click Create.
    * Fill in the details:
        * Connection Id: Give it a meaningful name, e.g., s3_conn
        * Connection Type: Select `Amazon Web Services`
        * Login: `<minio loggin id>`
        * Password: `<minio loggin password>`
        * Extra: `{"host": "http://host.docker.internal:9000", "region_name": null, "endpoint_url": "http://host.docker.internal:9000", "use_ssl": false}`
    >> NOTE: Testing the connection will fail in Airflow, but using connection using the dag will create a connection.
    

## Airflow Hooks S3 PostgreSQL

* To read data from DB and stoing it into the S3 bucker.
* 