### **Airflow Interview Questions**

#### Basic Questions

**Question 1**

**What is Apache Airflow? Why is it used?**

Apache Airflow is an **open-source workflow orchestration tool** that allows developers to programmatically **author, schedule, and monitor workflows or data pipelines**. Workflows in Airflow are defined as Directed Acyclic Graphs (DAGs), which represent the series of tasks to be executed and their dependencies.

Airflow is widely used in data engineering and ETL processes to:
- Schedule workflows: Automate the execution of workflows at specific times or intervals.
- Monitor workflows: Provide detailed logging and visual interfaces to track task progress.
- Handle dependencies: Manage complex task dependencies and ensure tasks are executed in the correct order.
- Retry and recovery: Automatically retry failed tasks and recover from errors.

Its flexibility, extensibility (through Python-based DAG definitions), and support for integration with external systems (e.g., databases, APIs, cloud services) make it a powerful tool for orchestrating data workflows.

**Question 2**

**What are DAGs in Airflow? How do they work?**

A Directed Acyclic Graph (DAG) is the core concept in Apache Airflow. It is a representation of a workflow or pipeline where:

Nodes represent tasks.
Edges represent dependencies between task.

**DAGs in Airflow have the following characteristics:**

Directed: Tasks have a defined execution order.
Acyclic: No circular dependencies are allowed; tasks cannot point back to an earlier task.


**How they work:**

Definition: A DAG is defined in Python code using the airflow.models.DAG class. It includes task definitions and dependency relationships.

Execution: The Airflow Scheduler reads DAG definitions, evaluates their schedule and queues the tasks for execution based on dependencies.

Monitoring: The Web UI provides real-time updates on task statuses like running, success, failed, or skipped.

**For example:**

In [None]:
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime

# Define a DAG
with DAG(
    'example_dag',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=False
) as dag:

    # Define tasks
    start = DummyOperator(task_id='start')
    process = DummyOperator(task_id='process')
    end = DummyOperator(task_id='end')

    # Define dependencies
    start >> process >> end

**QUESTION 3**

**What is the difference between a Task and a DAG in Airflow?**

**Definition:** A Task represents a single unit of work in a workflow (e.g., running a script or querying DB) while	A DAG represents the entire workflow, composed of multiple tasks and their dependencies.

**Scope:**	A Task is an instance of an Operator (e.g., PythonOperator, BashOperator) while A DAG is a container for tasks and defines the workflow's structure and execution order.

**Execution:**	A Task executes a specific function or script.	A DAG does not execute anything directly; it acts as a blueprint for the workflow.

**Dependency:**	Tasks are linked together to define the order of execution while The DAG defines these task dependencies.

In short, a Task is an action while a DAG organizes these actions into a complete workflow.

**QUESTION 4** 

**What are the key components of Airflow architecture?**

DAGs (Directed Acyclic Graphs): Represent workflows in Airflow.

**Scheduler:** Manages the scheduling of tasks in DAGs, ensuring tasks are executed according to their dependencies and schedule intervals.

**Executor:** Executes tasks on worker nodes. Different types include:
- SequentialExecutor: Executes tasks one at a time.
- LocalExecutor: Executes tasks in parallel on the local machine.
- CeleryExecutor: Distributes tasks across multiple worker nodes.
- KubernetesExecutor: Executes tasks in Kubernetes pods.

**Metadata Database:** Stores the state of DAGs, tasks, and other Airflow configurations. Common databases used include PostgreSQL and MySQL.

**Web Server:** Provides a user interface for monitoring DAGs, tasks, and their statuses.

**Workers:** Nodes responsible for running the tasks assigned by the Scheduler.

**QUESTION 5**

**What is an Operator in Airflow? Can you name a few types of Operators?**

An Operator in Airflow is a building block of tasks. It defines what a task should do (e.g., execute a Python function, run a Bash script, or transfer data). Operators are designed to be reusable and modular.

**Types of Operators:**

1. **Action Operators:** Perform specific actions.
- PythonOperator (executes Python code)
- BashOperator (executes Bash scripts)

2. **Transfer Operators:** Transfer data between systems.
- S3ToGCSOperator (moves data from S3 to Google Cloud Storage)
- MySqlToGCSOperator (exports data from MySQL to Google Cloud Storage)

3. **Sensor Operators:** Wait for a condition to be met before proceeding.
- FileSensor (waits for a file to appear)
- ExternalTaskSensor (waits for a task in another DAG to complete)

**QUESTION 6**

**What is a Task Instance? How is it different from a Task?**

A Task Instance is a specific execution of a Task within a DAG run. While a Task is a blueprint (what needs to be done), a Task Instance is its runtime execution with unique metadata like execution date and state.

Task: Defined in code, represents what action to perform.

Task Instance: Represents a single execution of that task on a specific DAG run.

**Example:** If a DAG runs daily, the same Task will have multiple Task Instances, each corresponding to a different day.

**QUESTION 7**

**What are Sensors in Airflow? Provide an example use case.**

Sensors are a special type of operator in Airflow designed to wait for a condition to be met. Sensors can pause a workflow until the specified condition, such as the presence of a file, is satisfied.

**Example use case:**

Use a S3KeySensor to wait for a specific file to arrive in an S3 bucket before proceeding with data processing.

In [None]:
from airflow.sensors.s3_key_sensor import S3KeySensor
from airflow import DAG
from datetime import datetime

with DAG('s3_sensor_dag', start_date=datetime(2024, 1, 1)) as dag:
    wait_for_file = S3KeySensor(
        task_id='wait_for_file',
        bucket_name='my-bucket',
        bucket_key='data/file.csv',
        aws_conn_id='my_aws_connection',
        timeout=3600
    )

**QUESTION 8**

**How do you backfill a DAG in Airflow?**

Backfilling in Airflow involves running a DAG for past dates that it missed due to being disabled or not existing at the time.

To backfill:

Enable catchup=True in the DAG definition. This tells Airflow to execute the DAG for all intervals since the start_date.

Use the Airflow CLI:

`airflow dags backfill -s 2023-01-01 -e 2023-12-31 my_dag_id`

This runs the DAG for all scheduled intervals between the start and end dates.


**QUESTION 9**

**What is the purpose of the airflow.cfg file?**

The airflow.cfg file is the main configuration file for Airflow. It contains settings for:
- Core settings (e.g., default timezone, DAG folder path)
- Executors (e.g., type of executor, parallelism)
- Logging (e.g., log file location)
- Database connections (e.g., metadata DB URI)
- Security (e.g., authentication method)

Example snippet:

In [None]:
[core]
dags_folder = /path/to/dags
executor = CeleryExecutor
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost:5432/airflow

**QUESTION 10**

**What are the different types of task dependencies in Airflow?**

Dependencies define the execution order of tasks. Common types include:

Sequential: A task depends on another.

`task1 >> task2`

Parallel: Multiple tasks run simultaneously.

`task1 >> [task2, task3]`

Cross dependencies:

`task1 >> task2 >> task4`

`task3 >> task4`

Dynamic dependencies: Set programmatically within a loop.


#### INTERMEDIATE QUESTIONS

**QUESTION 1**

How would you schedule a DAG to run at specific intervals? Explain cron syntax in the context of Airflow.

In Airflow, DAG schedules are defined using the schedule_interval parameter. You can specify:

Predefined intervals:

`@daily`: Runs once a day.

`@hourly`: Runs every hour.

`@once`: Runs a single time when the DAG is triggered.

`@weekly`, @monthly: Other predefined options.

`Cron expressions`: Provide precise scheduling flexibility using cron syntax:

Format: minute hour day_of_month month day_of_week

Example: 0 12 * * * runs the DAG every day at 12:00 PM.

Example DAG using cron:

In [None]:
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime

with DAG(
    dag_id="cron_example",
    schedule_interval="30 9 * * 1-5",  # 9:30 AM, Monday to Friday
    start_date=datetime(2024, 1, 1),
    catchup=False
) as dag:
    start = DummyOperator(task_id="start")


**QUESTION 2**

What is the role of the Airflow Scheduler? How does it differ from the Worker?

Scheduler:

The Scheduler is responsible for monitoring DAGs, triggering task executions at the correct time and managing task dependencies.

- It decides which tasks are ready to run and sends them to the executor for execution.

- It does not execute tasks directly.

Worker:

- The Worker executes the tasks assigned by the Scheduler.

- Depending on the executor type, workers may run locally (LocalExecutor) or on distributed nodes (CeleryExecutor, KubernetesExecutor).

**QUESTION 3**
What are XComs in Airflow? How are they used?

XComs (Cross-communications) allow tasks in Airflow to share data between one another. They are:

- Key-value-based: Data is stored using a key and retrieved using the same key.
- Stored in the metadata database.

Use cases:

- Passing data from one task to another.
- Storing intermediate results.

Example: Task 1 pushes data:

In [None]:
from airflow.operators.python import PythonOperator

def push_data(**kwargs):
    kwargs['ti'].xcom_push(key='example_key', value='hello_world')

task1 = PythonOperator(task_id='push_task', python_callable=push_data, provide_context=True)


Example: Task 2 pulls data:

In [None]:
def pull_data(**kwargs):
    value = kwargs['ti'].xcom_pull(task_ids='push_task', key='example_key')
    print(f"Value: {value}")

task2 = PythonOperator(task_id='pull_task', python_callable=pull_data, provide_context=True)

**QUESTION 4**

How do you handle task retries and failure in Airflow?

Airflow provides configurable options for handling task retries and failures:

Retries: Specify the number of retry attempts for a task using the retries parameter.

Retry delay: Set a delay between retries using the retry_delay parameter.

Custom failure behavior: Use on_failure_callback to define custom logic (e.g., send an alert).

Example:

In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def my_task():
    raise Exception("Simulating failure")

with DAG(
    dag_id="retry_example",
    start_date=datetime(2024, 1, 1),
    schedule_interval="@daily",
    catchup=False,
) as dag:
    retry_task = PythonOperator(
        task_id="retry_task",
        python_callable=my_task,
        retries=3,  # Retry 3 times
        retry_delay=timedelta(minutes=5),  # Wait 5 minutes between retries
    )


**QUESTION 5**

What is the difference between @daily, @hourly, and @once schedule intervals?

`@daily`: Runs the DAG once a day, at midnight by default.

`@hourly`: Runs the DAG once every hour.

`@once`: Executes the DAG only once, immediately when triggered.

Example Comparison:

In [None]:
DAG(dag_id="daily_example", schedule_interval="@daily", ...)
DAG(dag_id="hourly_example", schedule_interval="@hourly", ...)
DAG(dag_id="once_example", schedule_interval="@once", ...)

**QUESTION 6**

**What is the purpose of the Airflow CLI? Can you give examples of common commands?**

The Airflow CLI allows users to manage DAGs, tasks, and configurations directly from the command line.

Common commands:

List all DAGs:

`airflow dags list`

Trigger a DAG manually:

`airflow dags trigger my_dag_id`

Pause a DAG:

`airflow dags pause my_dag_id`

View task logs:

`airflow tasks logs my_dag_id my_task_id 2024-01-01T00:00:00`

**QUESTION 7**

**How do you set up Airflow connections for external systems like databases or APIs?**

Airflow uses Connections to securely store credentials and configuration for external systems. These can be managed via the Airflow UI or CLI.

Steps to create a connection:

Navigate to Admin > Connections in the Airflow UI.

Click Create.

Fill in the details:

Connection ID: Unique name for the connection.

Connection Type: (e.g., Postgres, S3, HTTP).

Host, Port, Login, Password, Schema: As applicable.

**Example: Setting up an HTTP connection:**

In [None]:
airflow connections add \
    --conn-id my_http_conn \
    --conn-type HTTP \
    --conn-host https://api.example.com \
    --conn-login my_username \
    --conn-password my_password


**QUESTION 8**

**What is a SubDagOperator? When would you use it?**

The SubDagOperator is used to embed a smaller DAG within a parent DAG. It is useful for modularizing workflows with repetitive patterns.

**Limitations:**

SubDAGs are executed as separate DAGs, which can add overhead.

They are now largely replaced by Task Groups in Airflow 2.x for better performance and simplicity.

Example:

In [None]:
from airflow.operators.subdag import SubDagOperator

def subdag(parent_dag_id, child_dag_id, args):
    with DAG(dag_id=f"{parent_dag_id}.{child_dag_id}", default_args=args) as sub_dag:
        task1 = DummyOperator(task_id="task1")
        task2 = DummyOperator(task_id="task2")
        task1 >> task2
    return sub_dag

with DAG(dag_id="parent_dag", start_date=datetime(2024, 1, 1)) as dag:
    subdag_task = SubDagOperator(
        task_id="subdag_task",
        subdag=subdag("parent_dag", "child_dag", args={}),
    )


**QUESTION 9**

Explain how Airflow handles logging and where logs are stored.

Airflow generates logs for each task instance, which are accessible via the Web UI or the file system.

1. Logging configuration: Defined in the airflow.cfg file under the [logging] section.

- Local logs: Stored in the logs/ directory by default.
- Remote logs: Can be stored in cloud storage (e.g., S3, GCS).

2. Log per task instance: Each task execution generates its own log file.

Example logging configuration:

In [None]:
[logging]
base_log_folder = /path/to/airflow/logs
remote_logging = True
remote_log_conn_id = my_s3_conn
remote_base_log_folder = s3://my-airflow-logs

**QUESTION 10**

**What are the pros and cons of running Airflow in standalone mode vs. distributed mode?**

- **Use Case** - Standalone is ideal for development and small workflows while distributed is suitable for production and large workflows.
- **Setup Complexity** - Standalone is easy to set up (single machine) while Distributed complex setup (requires multiple nodes and external database).
- **Scalability**- Standalone is limited to a single machine's resources while Distributed scales horizontally with Celery or Kubernetes Executors.
- **Performance**- Standalone can handle only a limited number of tasks while Distributed can handles high workloads with distributed execution across workers.

#### ADVANCED QUESTIONS

**QUESTION 1**

**How do you scale Airflow for high availability and large workflows?**

To scale Airflow for high availability (HA) and large workflows you need to focus on redundancy, resource allocation, and distributed execution:

1. Use a Distributed Executor:

- CeleryExecutor: Distributes tasks across multiple worker nodes using a message broker like RabbitMQ or Redis.
- KubernetesExecutor: Dynamically scales by running tasks in isolated Kubernetes pods.
- Advantages: Tasks are distributed, making it easier to handle heavy workflows.

2. Multiple Schedulers:

- Running multiple schedulers ensures that if one fails, others can take over, reducing downtime.
- Airflow 2.x supports multiple active schedulers.

3. Metadata Database:

- Use a high-availability database like PostgreSQL or MySQL with replication.
- Optimize database performance by enabling connection pooling (e.g., using pgbouncer for PostgreSQL).

4. Load Balancing:

- Use a load balancer in front of the web server to distribute UI traffic.

5. Storage:

- Centralize log storage in a shared location, such as S3, GCS, or an NFS mount, to make logs accessible to all nodes.

6. Monitoring:

- Use tools like Prometheus and Grafana to monitor system performance and alert on failures.

**Example Architecture:**

- Airflow Web Server: Multiple instances behind a load balancer.
- Scheduler: Two or more instances for redundancy.
- Workers: Multiple worker nodes (e.g., in Celery or Kubernetes mode).
- Message Broker: Redis or RabbitMQ for task queuing.
- Metadata DB: Highly available PostgreSQL cluster.

**QUESTION 2**

**What are Task Groups in Airflow 2.x? How do they differ from SubDAGs?**

Task Groups were introduced in Airflow 2.x as a way to visually group tasks in the UI without the overhead of creating SubDAGs.

- Task Groups:
    - Lightweight and simple.
    - Used for grouping tasks logically, improving readability in the UI.
    - Tasks remain part of the parent DAG.

- SubDAGs:
    - Executed as separate DAGs, which can cause performance issues.
    - Requires a SubDagOperator.
    - Deprecated in favor of Task Groups.

Example Task Group:

In [None]:
from airflow.utils.task_group import TaskGroup
from airflow.operators.dummy import DummyOperator
from airflow import DAG
from datetime import datetime

with DAG(dag_id="task_group_example", start_date=datetime(2024, 1, 1)) as dag:
    start = DummyOperator(task_id="start")

    with TaskGroup(group_id="group1") as group1:
        task1 = DummyOperator(task_id="task1")
        task2 = DummyOperator(task_id="task2")
        task1 >> task2

    end = DummyOperator(task_id="end")

    start >> group1 >> end


**QUESTION 3**

**How would you secure an Airflow deployment in a production environment?**

Securing Airflow in production involves addressing access control, data security, and network security:

 - Authentication and Authorization:
    - Enable RBAC (Role-Based Access Control) to manage user permissions.
    - Use OAuth, LDAP, or other authentication backends.

- Secure Connections:
    - Use HTTPS for the web server.
    - Store sensitive connection details (e.g., passwords, keys) in a secrets manager like AWS Secrets Manager, HashiCorp Vault, or Azure Key Vault.

- Database Security:
    - Restrict database access to only the Airflow service account.
    - Use encryption for the metadata database.

- Network Security:
    - Deploy Airflow in a private network or behind a VPN.
    - Limit access to the web server and API endpoints using a firewall.

- Environment Isolation:
    - Use containerized deployments (e.g., Docker, Kubernetes) to isolate Airflow processes.

- Audit Logging:
    - Enable logging for all API requests and user actions.

**QUESTION 4**
How does Airflow integrate with cloud services like AWS, GCP, or Azure?

Airflow provides pre-built hooks and operators for seamless integration with cloud services.

**AWS:**
- Hooks: `S3Hook`, `RedshiftHook`
- Operators: `S3ToRedshiftOperator`, `S3FileTransformOperator`

Example: Copying files to S3:

In [None]:
from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator
task = S3CreateObjectOperator(
    task_id="upload_to_s3",
    aws_conn_id="my_aws_conn",
    bucket_name="my_bucket",
    object_key="path/to/file",
    data="Sample Data",
)


**GCP:**

- Hooks: `BigQueryHook`, `GCSHook`
- Operators: `BigQueryInsertJobOperator`, `GCSToBigQueryOperator`

Example: Loading data into BigQuery

In [None]:
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
task = BigQueryInsertJobOperator(
    task_id="load_bq",
    gcp_conn_id="my_gcp_conn",
    configuration={
        "query": {"query": "SELECT * FROM dataset.table", "useLegacySql": False}
    },
)


**Azure:**

- Hooks: `AzureBlobStorageHook`
- Operators: `AzureDataFactoryRunPipelineOperator`, `AzureBlobStorageDownloadOperator`

Example: Loading data into Azure Blob

In [None]:
from airflow import DAG
from airflow.providers.microsoft.azure.operators.azure_blob import AzureBlobStorageCreateBlobOperator
from datetime import datetime

# Define the DAG
with DAG(
    dag_id='upload_to_azure_blob',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False,
) as dag:

    # Upload a file to Azure Blob Storage
    upload_to_blob = AzureBlobStorageCreateBlobOperator(
        task_id='upload_to_blob',
        container_name='my-container',  # Replace with your container name
        blob_name='data/file.csv',      # Path to store the file in the container
        data='Sample data for Azure Blob Storage',  # File content (use a path for larger files)
        azure_blob_conn_id='azure_blob_conn',       # Azure connection ID
    )


**QUESTION 5**

What is the role of the Celery Executor? How does it compare to the Local Executor and Kubernetes Executor?
The Celery Executor allows Airflow to run tasks in a distributed manner across multiple worker nodes.

Executor	CeleryExecutor	LocalExecutor	KubernetesExecutor
- **Execution Mode**: CeleryExecutor distributes tasks across multiple nodes, LocalExecutor runs tasks locally on the same node, KubernetesExecutors runs tasks in Kubernetes pods.
- **Scalability**: CeleryExecutor is highly scalable with more worker nodes, LocalExecutor is limited to the local machine's capacity,	KubernetesExecutor is dynamically scalable with Kubernetes.
- **Setup Complexity**: CeleryExecutor requires a message broker (Redis or RabbitMQ), LocalExecutor requires minimal setup.	KubernetesExecutor requires complex setup (requires Kubernetes).
- **Use Case**: CeleryExecutor uses medium to large workflows.	LocalExecutor uses small to medium workflows.	KubernetesExecutor uses Cloud-native, highly dynamic workflows.

**QUESTION 6**

**How would you debug a DAG that is stuck in a running state?**

__Steps to debug:__

1. Check Task Logs:
    - Inspect the logs for each task instance to identify errors or bottlenecks.

2. Monitor Worker and Scheduler:

    - Ensure that workers are running and connected to the Scheduler.
    - Check for resource exhaustion (e.g., CPU, memory).

3. Verify Dependencies:

    - Look for cyclic dependencies or improperly set trigger_rule parameters.

4. Database Check:

    - Query the metadata database to check the status of the stuck task

In [None]:
SELECT dag_id, task_id, state FROM task_instance WHERE state = 'running';

5. Restart Components:

    - Restart the Scheduler or Worker processes if they appear unresponsive.

**QUESTION 7**

**What is the purpose of the trigger_rule parameter? Can you give an example?**

The trigger_rule parameter defines how a task is triggered based on the state of its upstream tasks. Common values:

- all_success (default): Task triggers if all upstream tasks succeed.
- all_failed: Task triggers if all upstream tasks fail.
- one_success: Task triggers if at least one upstream task succeeds.

Example:

In [None]:
from airflow.operators.dummy import DummyOperator
from airflow import DAG
from datetime import datetime

with DAG(dag_id="trigger_rule_example", start_date=datetime(2024, 1, 1)) as dag:
    task1 = DummyOperator(task_id="task1")
    task2 = DummyOperator(task_id="task2")
    task3 = DummyOperator(task_id="task3", trigger_rule="one_success")

    task1 >> task3
    task2 >> task3


In this case, task3 will execute as soon as either task1 or task2 succeeds.

**QUESTION 8**

**Explain the difference between depends_on_past and wait_for_downstream parameters.**

**depends_on_past:**

- Ensures that a task only runs if the same task from the previous DAG run has completed successfully.
- Use case: Sequential processing of daily data.

**wait_for_downstream:**

- Ensures that a task waits for all downstream tasks from the previous DAG run to finish before starting.
- Use case: Avoid overlapping data dependencies across runs.

**QUESTION 9**

**What is the purpose of Airflow's REST API? How would you use it?**

The REST API allows programmatic interaction with Airflow to:
- Trigger DAGs.
- Retrieve DAG and task statuses.
- Manage connections and variables.

Example: Triggering a DAG:

In [None]:
curl -X POST \
  "http://localhost:8080/api/v1/dags/my_dag_id/dagRuns" \
  -H "Content-Type: application/json" \
  -d '{"conf": {}, "dag_run_id": "manual_run_1"}'


**QUESTION 10**

**How do you implement dynamic task generation in Airflow? Provide a code example.**

Dynamic task generation creates tasks programmatically based on external inputs, such as a list of files or database records.

Example:

In [None]:
from airflow.operators.python import PythonOperator
from airflow import DAG
from datetime import datetime

def process_file(file_name):
    print(f"Processing file: {file_name}")

file_list = ["file1.csv", "file2.csv", "file3.csv"]

with DAG(dag_id="dynamic_task_example", start_date=datetime(2024, 1, 1)) as dag:
    for file in file_list:
        task = PythonOperator(
            task_id=f"process_{file}",
            python_callable=process_file,
            op_args=[file]
        )
