
---

## 🧠 Introduction to Apache Airflow 

### 🟩 1. Introduction
Apache Airflow is a platform designed to **create, schedule, and monitor workflows** programmatically. It is widely used in data engineering for orchestrating complex workflows and automating data pipelines.

Instructor: **Mike Metzger**, a Data Engineer.

---

### 🟩 2. Understanding Data Engineering

- **Data Engineering** involves transforming raw data into a format that is reliable, repeatable, and maintainable.
- It includes designing systems that move, clean, and organize data.
- The goal is to **automate and optimize** all processes involving data handling and transformation.

---

### 🟩 3. What is a Workflow?

- A **workflow** is a series of steps to complete a specific data task.
- These steps can include:
  - Downloading files
  - Copying or moving data
  - Filtering or cleaning data
  - Writing data to a database
- Workflows can be simple (2–3 steps) or very complex (hundreds of steps).
- In data engineering, a workflow describes the **flow of data** and the sequence of operations applied to it.

---

### 🟩 4. What is Apache Airflow?

- **Apache Airflow** is an open-source platform that allows users to:
  - Program workflows using **Python**
  - **Schedule** them to run automatically
  - **Monitor** them through a web interface, CLI, or REST API
- Airflow enables visibility and control over data processes, ensuring they are **maintainable and observable**.

---

### 🟩 5. Core Concept: DAG (Directed Acyclic Graph)

- A **DAG** is the fundamental structure used in Airflow to define a workflow.
- In simple terms:
  - **Directed**: The tasks follow a defined direction (one task flows into another).
  - **Acyclic**: There are no circular dependencies.
- A DAG is a **graph** where:
  - **Nodes = Tasks**
  - **Edges = Dependencies between tasks**

#### DAG Components:
- `dag_id`: Unique identifier for the DAG
- `start_date`: When the DAG is allowed to start running
- `default_args`: Dictionary containing settings like retries, email alerts, etc.
- Owner and email notification configurations can also be included

#### Example:
```python
from airflow import DAG
from datetime import datetime

default_args = {
    'start_date': datetime(2024, 1, 10),
}

dag = DAG(dag_id='etl_pipeline', default_args=default_args)
```

> In the shell (CLI), you refer to the DAG using `dag_id` (e.g., `etl_pipeline`), not the variable name.

---

### 🟩 6. Running Workflows (Tasks) in Airflow

- A **task** is a single unit of work in a DAG.
- Tasks can be tested or run using the CLI.

#### Command:
```bash
airflow tasks test <dag_id> <task_id> <execution_date>
```

#### Example:
```bash
airflow tasks test example-etl download-file 2024-01-10
```

- This command manually triggers the `download-file` task inside the DAG `example-etl` for January 10, 2024.
- It is useful for testing individual components of a DAG before scheduling the whole pipeline.

---

### 🟩 7. Airflow Interfaces

Airflow can be interacted with in several ways:

- **Code (Python)** – to define and configure DAGs and tasks.
- **Command-Line Interface (CLI)** – to trigger tasks and manage workflows manually.
- **Web UI** – to visualize DAGs, monitor runs, logs, and status of tasks.
- **REST API** – to programmatically manage DAGs and tasks.

---

### 🟩 8. Comparison with Other Tools

While Airflow is powerful, it’s not the only tool for managing workflows:

| Tool         | Description                                  |
|--------------|----------------------------------------------|
| **Luigi**    | Created by Spotify, focuses on dependency resolution. |
| **SSIS**     | Microsoft SQL Server Integration Services, for ETL in MS environments. |
| **Bash Scripting** | Lightweight automation; used for simple tasks. |

> Airflow integrates well with Bash, and scripts can be part of DAG tasks.

---



---

## 🔷 Airflow DAGs

---

### 🟦 1. Introduction to DAGs in Airflow

- DAG stands for **Directed Acyclic Graph**.
- DAGs are the **primary building blocks** in Airflow workflows.
- They define the **order and dependency** of tasks in a workflow.

---

### 🟦 2. What is a DAG?

A DAG has three important properties:

1. **Directed**:  
   - Tasks have a defined order of execution.  
   - The direction ensures dependencies are followed properly (e.g., Task A runs before Task B).

2. **Acyclic**:  
   - There are **no loops**; tasks do not repeat within the same run.  
   - However, the **entire DAG** can be rerun.

3. **Graph**:  
   - The structure represents tasks (nodes) and dependencies (edges).  
   - It provides a **clear and logical representation** of a workflow.

> 🧠 Note: The DAG concept is common in data engineering tools like **Apache Spark**, **dbt**, and others—not just in Airflow.

---

### 🟦 3. DAGs in Airflow

- In Airflow, DAGs are defined using **Python code**.
- The DAG describes how tasks are related and when they should be executed.
- Although DAGs are defined in Python, tasks themselves can execute:
  - **Python functions**
  - **Bash scripts**
  - **Spark jobs**
  - **Custom operators**
- Airflow breaks DAGs into **tasks**, which can be:
  - **Operators** (pre-built functionality like BashOperator, PythonOperator)
  - **Sensors** (wait for external events or states)
- Dependencies between tasks can be **explicit** or **implicit**.

> Example of a dependency:  
> `task_a >> task_b` means task_b depends on task_a.

---

### 🟦 4. How to Define a DAG in Python

#### Step-by-step DAG setup:

1. **Import DAG class**:
```python
from airflow import DAG
```

2. **Define default arguments** (optional but recommended):
```python
default_args = {
    'owner': 'jdoe',
    'email': ['jdoe@example.com'],
    'start_date': datetime(2024, 1, 1)
}
```

3. **Create DAG object using context manager** (recommended for Airflow 2.x+):
```python
from datetime import datetime

with DAG(
    dag_id='etl_workflow',
    default_args=default_args,
    schedule_interval='@daily',  # Optional
    catchup=False                # Optional
) as etl_dag:
    # Task definitions go here
    pass
```

4. **Alternate (pre-Airflow 2.x) method**:
```python
etl_dag = DAG('etl_workflow', default_args=default_args)
# Task definitions would be written under this DAG variable
```

> ✅ Both methods are functionally valid. The `with` statement is newer and more readable.

> ⚠️ `DAG` is case-sensitive in Python. Use it exactly as defined.

---

### 🟦 5. DAGs on the Command Line

Airflow provides a robust **command-line interface (CLI)** for interacting with DAGs:

- General help:
  ```bash
  airflow -h
  ```

- List all registered DAGs:
  ```bash
  airflow dags list
  ```

- Other useful CLI commands:
  - `airflow dags trigger <dag_id>` – manually trigger a DAG run
  - `airflow dags pause <dag_id>` – pause a DAG
  - `airflow dags unpause <dag_id>` – resume a paused DAG
  - `airflow dags show <dag_id>` – visualize the DAG structure

---

### 🟦 6. When to Use CLI vs Python

| Task                                  | Use CLI                     | Use Python                  |
|---------------------------------------|-----------------------------|-----------------------------|
| Starting Airflow processes            | ✅                          | ❌                          |
| Triggering DAGs or tasks manually     | ✅                          | ❌                          |
| Viewing logs or task states           | ✅                          | ❌                          |
| Creating DAGs and defining workflows  | ❌                          | ✅                          |
| Writing business logic or data steps  | ❌                          | ✅                          |

> 🛠 CLI is operational, Python is developmental.

---



### **Defining a simple DAG**
You've spent some time reviewing the Airflow components and are interested in testing out your own workflows. To start you decide to define the default arguments and create a DAG object for your workflow.

The DateTime object has been imported for you.

In [None]:
# Import the DAG object
from airflow import DAG

# Define the default_args dictionary
default_args = {
  'owner': 'dsmith',
  'start_date': datetime(2023, 1, 14),
  'retries': 2
}

# Instantiate the DAG object
with DAG('example_etl', default_args=default_args) as etl_dag:
  pass



## 🔷 Airflow Web Interface – Detailed Notes

---

### 🟦 1. Introduction to the Airflow Web UI

- The **Airflow Web Interface (UI)** is a graphical tool that helps in:
  - Monitoring DAGs
  - Triggering or pausing workflows
  - Viewing task status and logs
  - Debugging workflows

- The web UI complements the CLI but is often more **intuitive and visual**.

---

### 🟦 2. DAGs View (Main Page)

This is the **landing page** of the Airflow UI. It gives an **overview of all available DAGs**.

#### Key components visible in the DAGs View:

| Section             | Description |
|---------------------|-------------|
| **DAG Name**         | A list of all DAGs/workflows present in the system |
| **Owner**            | Who created or owns the DAG (from `default_args`) |
| **Schedule**         | Cron string or time-based trigger for DAG execution |
| **Last Run**         | Timestamp of the most recent DAG execution |
| **Next Run**         | Scheduled time for the next DAG run |
| **Recent Tasks**     | Small icons or indicators showing status of last few task runs (success, failed, etc.) |
| **Actions**          | Options to trigger DAG, pause it, delete it, or view details |

> 🔗 Clicking on a DAG name (e.g., `example_dag`) takes you to its **Detail Page**.

---

### 🟦 3. DAG Detail View

This view provides **in-depth access to one specific DAG**. It includes several subviews:

#### Key components:

1. **Grid View** (Default):
   - Displays a table/grid of DAG runs and task states.
   - Shows task names, operator types, and dependencies visually.
   - Useful for checking which tasks succeeded, failed, or are running.

2. **Graph View**:
   - Presents a **graphical flowchart** of tasks and their dependencies.
   - Task nodes display type (e.g., BashOperator).
   - Useful to understand the logical structure and flow of the DAG.

3. **Code View**:
   - Shows the actual **Python code** used to define the DAG.
   - This is **read-only** – edits must be done in the script file.
   - Helps quickly inspect what a DAG does (e.g., `echo $RANDOM` in BashOperator).

4. **Other Functional Tabs**:
   - **Task Duration**: How long each task took to run.
   - **Task Tries**: Number of attempts made for each task.
   - **Gantt Chart**: Visual timeline showing when tasks started and finished.
   - **Run History**: Past executions of the DAG.

#### Actions available:
- **Trigger DAG** – manually start a DAG run.
- **Refresh** – update the UI with current info.
- **Delete** – remove a DAG from the Airflow environment.

---

### 🟦 4. Audit Logs (under Browse Menu)

- Logs provide **system-level insight and user activity tracking**.
- Tracks events such as:
  - Starting/stopping the webserver
  - Viewing DAG graph/code
  - Creating users
  - Triggering DAGs
- **Event Type** column helps filter logs (e.g., `cli`, `scheduler`, `webserver`, `graph`, etc.)

> 🔍 Frequent log inspection helps understand system behavior and debug problems.

---

### 🟦 5. Web UI vs Command Line Tool

| Aspect                      | Web UI                              | Command Line Tool (CLI)       |
|-----------------------------|-------------------------------------|-------------------------------|
| User Experience             | Intuitive, visual                   | Fast, scriptable              |
| Setup/Access                | May require browser access          | SSH or local terminal         |
| Triggering DAGs             | ✅ Easy to click                     | ✅ Via `airflow dags trigger` |
| Viewing DAGs/Logs           | ✅ Visual detail                     | ✅ Text-based output          |
| Editing DAG code            | ❌ (Read-only)                      | ❌ (Requires file editor)     |
| Suitable for                | Monitoring and administration       | Advanced users and automation |

> 🧩 Most developers use **both interfaces** based on what they are trying to accomplish.

---


**Airflow CLI commands**

---

### 1. **Help**
```bash
airflow -h
```
📘 *Displays help info and lists all available commands.*

---

### 2. **List DAGs**
```bash
airflow dags list
```
🔍 *Lists all DAGs available in the Airflow environment.*

---

### 3. **Trigger a DAG**
```bash
airflow dags trigger <dag_id>
```
🚀 *Manually triggers a DAG run.*

- `dag_id`: ID of the DAG to run

---

### 4. **Start Webserver**
```bash
airflow webserver
```
🌐 *Starts the Airflow web UI (default at `localhost:8080`).*

**Common flags:**
- `-p <port>`: Change the port (default is 8080)

---

### 5. **Start Scheduler**
```bash
airflow scheduler
```
📅 *Starts the Airflow scheduler to execute scheduled DAGs.*

---

### 6. **Test a Task**
```bash
airflow tasks test <dag_id> <task_id> <execution_date>
```
🧪 *Runs a single task instance without dependencies.*

- `dag_id`: DAG containing the task  
- `task_id`: Task to test  
- `execution_date`: Date for task run (`YYYY-MM-DD` format)

---

### 7. **View Task Logs**
```bash
airflow tasks logs <dag_id> <task_id> <execution_date>
```
📄 *Displays logs for a given task run.*

---

### 8. **Pause a DAG**
```bash
airflow dags pause <dag_id>
```
⏸️ *Prevents a DAG from being scheduled automatically.*

---

### 9. **Unpause a DAG**
```bash
airflow dags unpause <dag_id>
```
▶️ *Resumes automatic scheduling of a paused DAG.*

---

### 10. **List Tasks in a DAG**
```bash
airflow tasks list <dag_id>
```
📋 *Shows all tasks within a specified DAG.*

---




### 🔍 **How to Examine the DAG**
1. **Go to the Airflow Web UI** (typically at `http://localhost:8080`).
2. In the **DAGs list**, locate and click on the DAG named `update_state`.
3. On the DAG detail page, explore the following views:
   - **Graph View**: Shows tasks and their dependencies with operator types.
   - **Tree View**: Displays task structure and run statuses over time.
   - **Code View**: Lets you view the raw Python code defining the DAG.

---

### ✅ **Determine Operators in Use**
- In the **Graph View**, each task usually shows the operator used (e.g., `BashOperator`, `PythonOperator`, etc.).
- Alternatively, in the **Code View**, look for lines like:
  ```python
  PythonOperator(...)
  BashOperator(...)
  DummyOperator(...)
  ```

---

### ❓ **Question Task**
Your task is to identify **which operator is NOT used** in `update_state`.

To answer:
- Open **Code View**.
- Search for operator classes used.
- Compare against a list of common operators (e.g., `PythonOperator`, `BashOperator`, `DummyOperator`, `EmailOperator`).

> 🔎 **Example**: If you see `PythonOperator` and `DummyOperator`, but **no reference to `BashOperator`**, then **BashOperator is NOT used**.




---

## 🚀 **Airflow Operators Overview**
**Operators** in Airflow define **what** each task does. Each operator represents a **single task** in the DAG.

- Operators are **self-contained** – they typically don’t share data with other tasks.
- Common operator categories:
  - Bash-related (e.g., `BashOperator`)
  - Python-based (e.g., `PythonOperator`)
  - Dummy/Placeholder (e.g., `EmptyOperator`)
  - Email notifications, Docker tasks, sensors, etc.

---

## 🔧 **Key Airflow Operators with Description & Arguments**

### 1. **BashOperator**
Runs any valid Bash command.

```python
from airflow.operators.bash import BashOperator

BashOperator(
    task_id='print_hello',
    bash_command='echo "Hello World!"'
)
```

**Common Arguments:**
- `task_id`: Unique ID of the task.
- `bash_command`: The actual bash command or script.
- `env`: (optional) Dictionary of environment variables.
- `cwd`: (optional) Working directory to run command in.

---

### 2. **PythonOperator**
Executes a Python function.

```python
from airflow.operators.python import PythonOperator

PythonOperator(
    task_id='run_my_func',
    python_callable=my_function
)
```

**Common Arguments:**
- `task_id`
- `python_callable`: The function to call.
- `op_args`: (optional) Positional args for the function.
- `op_kwargs`: (optional) Keyword args for the function.

---

### 3. **EmptyOperator** *(Previously DummyOperator)*
Used as a placeholder or for DAG structure/visualization.

```python
from airflow.operators.empty import EmptyOperator

EmptyOperator(
    task_id='start_pipeline'
)
```

**Common Arguments:**
- Only `task_id` is usually needed.

---

### 4. **EmailOperator**
Sends an email notification.

```python
from airflow.operators.email import EmailOperator

EmailOperator(
    task_id='send_email',
    to='user@example.com',
    subject='Airflow Alert',
    html_content='<p>DAG completed!</p>'
)
```

**Common Arguments:**
- `to`: Recipient email.
- `subject`: Email subject.
- `html_content`: Body content.

---

### ⚠️ **Operator Gotchas**
- Operators **run in isolated environments**.
- Use `env` to pass variables.
- Be mindful of **elevated privileges** and restricted directories.

---



Here’s a complete **Airflow DAG** that brings together the operators we discussed — `BashOperator`, `PythonOperator`, `EmptyOperator`, and `EmailOperator`. It includes detailed comments to help you understand each component:

```python
from datetime import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator

# Define a simple Python function for PythonOperator
def greet():
    print("Hello from Python!")

# Define default arguments for the DAG
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 1, 1),
    'email': ['alerts@example.com'],
    'email_on_failure': False,
    'email_on_retry': False
}

# Define the DAG
with DAG(
    dag_id='example_combined_operator_dag',
    description='A DAG demonstrating Bash, Python, Empty, and Email operators',
    schedule_interval=None,  # manual trigger only
    default_args=default_args,
    catchup=False,
    tags=['example', 'operators']
) as dag:

    # Start placeholder task (EmptyOperator)
    start = EmptyOperator(
        task_id='start'
    )

    # BashOperator task to print a message
    bash_task = BashOperator(
        task_id='bash_echo',
        bash_command='echo "This is a BashOperator task."',
        env={'MY_ENV_VAR': '123'}  # Example environment variable
    )

    # PythonOperator task to run a Python function
    python_task = PythonOperator(
        task_id='python_greeting',
        python_callable=greet
    )

    # Another BashOperator to simulate a script run
    clean_task = BashOperator(
        task_id='run_data_cleaning',
        bash_command='cat /etc/passwd | awk -F ":" \'{print $1}\''
    )

    # EmailOperator task to notify completion
    notify = EmailOperator(
        task_id='send_email',
        to='team@example.com',
        subject='DAG Completed Successfully',
        html_content='<p>The DAG has finished running.</p>'
    )

    # End placeholder task
    end = EmptyOperator(
        task_id='end'
    )

    # Set task dependencies
    start >> bash_task >> python_task >> clean_task >> notify >> end
```

---

### 🧠 What this DAG does:
- **`start`**: Begins the pipeline (dummy task).
- **`bash_task`**: Echoes a simple message.
- **`python_task`**: Executes a Python function (`greet`).
- **`clean_task`**: Runs a simple bash data-cleaning simulation.
- **`notify`**: Sends an email when done.
- **`end`**: Marks the end of the workflow.



---
### **Airflow Tasks & Dependencies**

1. **Airflow Operators**:
   - **Definition**: Operators represent individual tasks in a workflow. Common types include `BashOperator`, `PythonOperator`, etc.
   - **Command-Line ID**: `airflow operators list` - Lists all available operators.

2. **Tasks**:
   - **Definition**: Tasks are instantiated operators. They are assigned to variables like `task1` or `task2`.
   - **Command-Line ID**: `airflow dags list_tasks <dag_id>` - Lists all tasks in a specific DAG.

3. **Task Dependencies**:
   - **Definition**: Task dependencies define the order of execution (upstream and downstream). Dependencies can be set using bitshift operators (`>>` for downstream, `<<` for upstream).
   - **Command-Line ID**: `airflow tasks list <dag_id>` - Shows the task dependency relationships in a DAG.

4. **Upstream vs Downstream**:
   - **Definition**: `Upstream` means before (task must complete first), and `Downstream` means after (task runs after another).
   - **Command-Line ID**: `airflow tasks clear <dag_id>` - Clears task states, allowing reruns and rescheduling.

5. **Simple Task Dependency**:
   - **Definition**: Example of using bitshift operators to chain tasks (task1 >> task2 means task1 runs before task2).
   - **Command-Line ID**: `airflow dags trigger <dag_id>` - Manually triggers a DAG to run.

6. **Task Dependencies in the Airflow UI**:
   - **Definition**: In the UI, task dependencies are visually represented with arrows showing which task runs before or after another.
   - **Command-Line ID**: `airflow dags list` - Lists all DAGs to view dependencies and runs.

7. **Multiple Dependencies**:
   - **Definition**: Complex dependencies can be set using bitshift operators in various configurations to define complex workflows.
   - **Command-Line ID**: `airflow tasks dependencies <dag_id>` - Displays task dependencies graphically.

---




---

### ✅ **Airflow DAG Example: Task Creation + Dependencies**

```python
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime

# Default DAG arguments
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
}

# Define the DAG context
with DAG(
    dag_id='task_dependency_example_dag',
    description='DAG to demonstrate task instantiation and dependencies',
    default_args=default_args,
    schedule_interval=None,
    catchup=False,
    tags=['example', 'task-dependencies']
) as dag:

    # Start task using EmptyOperator (acts as a placeholder)
    start = EmptyOperator(
        task_id='start'
    )

    # Task 1: Run bash command
    task1 = BashOperator(
        task_id='first_task',
        bash_command='echo "This is the first task."'
    )

    # Task 2: Run another bash command
    task2 = BashOperator(
        task_id='second_task',
        bash_command='echo "This is the second task."'
    )

    # Task 3: Yet another bash task
    task3 = BashOperator(
        task_id='third_task',
        bash_command='echo "This is the third task."'
    )

    # Task 4: Final task
    task4 = BashOperator(
        task_id='final_task',
        bash_command='echo "Workflow complete!"'
    )

    # Define task dependencies using bitshift operators
    start >> task1 >> task2 >> task3 >> task4  # Linear dependency chain

    # Alternatively (equivalent way):
    # task1 >> task2
    # task2 >> task3
    # task3 >> task4
```

---

### 🧠 **Concepts Highlighted**
| Concept                    | Description                                                                 |
|---------------------------|-----------------------------------------------------------------------------|
| **Tasks = Operators**     | Each `task_id` assigned to a variable (e.g., `task1`) is an instantiated operator. |
| **Dependencies**          | `task1 >> task2` means task1 must finish before task2 starts (upstream).      |
| **UI Visualization**      | These will be visible in **Graph View** with arrows showing flow between tasks. |
| **start & final_task**    | Use of `EmptyOperator` as "bookends" is common to clarify flow visually.      |



### **Define order of BashOperators**
Now that you've learned about the bitshift operators, it's time to modify your workflow to include a pull step and to include the task ordering. You have three currently defined components, cleanup, consolidate, and push_data.

The DAG analytics_dag is available as before and the BashOperator is already imported.

In [None]:
# Import necessary modules
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

# Define the DAG
with DAG('analytics_dag', start_date=datetime(2023, 4, 1), schedule_interval='@daily') as dag:
    
    # Define the pull_sales task (first step)
    pull_sales = BashOperator(
        task_id='pull_sales',
        bash_command='wget https://salestracking/latestinfo?json',
        dag=dag
    )
    
    # Define the cleanup task (second step)
    cleanup = BashOperator(
        task_id='cleanup',
        bash_command='rm -rf /tmp/sales_data/*',  # Example cleanup command
        dag=dag
    )
    
    # Define the consolidate task (third step)
    consolidate = BashOperator(
        task_id='consolidate',
        bash_command='python /scripts/consolidate_data.py',  # Example consolidate command
        dag=dag
    )
    
    # Define the push_data task (fourth and last step)
    push_data = BashOperator(
        task_id='push_data',
        bash_command='python /scripts/push_to_database.py',  # Example push data command
        dag=dag
    )
    
 
# Set pull_sales to run prior to cleanup
pull_sales >> cleanup

# Configure consolidate to run after cleanup
consolidate << cleanup

# Set push_data to run last
consolidate >> push_data




### 1. **Check for Import Errors with `list-import-errors`:**
To check for any import issues related to your DAGs, run the following command:

```bash
airflow dags list-import-errors
```

This command will provide a detailed list of all import errors that Airflow encounters while trying to load the DAGs. It will point out exactly where the issues are, whether it's due to a missing module, a wrong file path, or other import-related issues.

### 2. **Decipher the Error Message:**
The output will include the error type and location. For example, you might see an error like:

```
ImportError: cannot import name 'MyCustomOperator' from 'my_custom_module'
```

This indicates that Airflow is unable to import `MyCustomOperator` from the specified `my_custom_module`, likely because the module is missing, the file path is incorrect, or the class isn't defined properly in the module.

### 3. **View the Python Code:**
After identifying the import error, you can view the Python code to understand the issue better. Run the following command to check the DAG's code:

```bash
cat workspace/dags/codependent.py
```

In the file, look at the import statements and ensure they are correct. Verify if the module paths are accurate and that all the required libraries or custom modules are available.

For example, check if you have something like this in your code:

```python
from airflow.operators.bash_operator import BashOperator
```

If you're using custom operators or modules, ensure they're correctly imported and available to Airflow. Double-check the paths to avoid any typos or mistakes.

### 4. **Fix the Import Error:**
Once you've identified the problem, make the necessary corrections:
- Ensure all custom modules are properly installed or available in the right directory.
- Install any missing dependencies with `pip`.
- Double-check the import paths for accuracy.

For example, if a custom module is missing, you can install it using:

```bash
pip install <missing_module>
```

### 5. **Re-run the DAG:**
Once the import issues are resolved, you can trigger the DAG again to ensure everything is working as expected:

```bash
airflow dags trigger <dag_id>
```

### Summary of Commands for Troubleshooting Import Errors:
- **List Import Errors**: `airflow dags list-import-errors`
- **View the DAG Code**: `cat workspace/dags/codependent.py`
- **Fix Import Issues**: Correct the import paths, install missing dependencies, and ensure custom modules are available.
- **Re-run the DAG**: `airflow dags trigger <dag_id>`



### Summary of Key Points: Additional Operators in Airflow

1. **PythonOperator**  
   The `PythonOperator` is used to execute a Python function or callable method as part of an Airflow task. It requires the `task_id`, `dag`, and `python_callable` arguments, with the `python_callable` being the function you wish to run. Arguments can be passed to the function using `op_args` (for positional arguments) or `op_kwargs` (for keyword arguments).

   - **Key Arguments**:
     - `task_id`: Unique identifier for the task.
     - `dag`: The DAG this task belongs to.
     - `python_callable`: The Python function to execute.
     - `op_args`: Positional arguments passed to the function.
     - `op_kwargs`: Keyword arguments passed to the function.

2. **PythonOperator Example**  
   An example shows how to use the `PythonOperator` to run a simple function `printme`, which writes a message to the task logs.

   ```python
   from airflow.operators.python import PythonOperator

   def printme():
       print("Hello from PythonOperator!")

   task = PythonOperator(
       task_id='python_task',
       python_callable=printme,
       dag=dag
   )
   ```

3. **Arguments in PythonOperator**  
   You can pass arguments to the function via `op_args` and `op_kwargs`. For example, using `op_kwargs`, you can pass named arguments to the function.

4. **op_kwargs Example**  
   The example defines a `sleep` function that accepts a `length_of_time` argument and then pauses for that amount of time. The `op_kwargs` dictionary is used to pass the `length_of_time` argument to the function.

   ```python
   from airflow.operators.python import PythonOperator
   import time

   def sleep(length_of_time):
       time.sleep(length_of_time)

   task = PythonOperator(
       task_id='sleep_task',
       python_callable=sleep,
       op_kwargs={'length_of_time': 5},
       dag=dag
   )
   ```

   - **Key Notes**: The dictionary keys must match the function's argument names. Incorrect keys can result in errors like `unexpected keyword argument`.

5. **EmailOperator**  
   The `EmailOperator` is used to send emails from within an Airflow task. This is useful for sending notifications, reports, etc. It requires the `task_id`, `to`, `subject`, and `html_content` arguments. The system must be configured with email server details for this to work.

   - **Key Arguments**:
     - `task_id`: Unique identifier for the task.
     - `to`: Recipient's email address.
     - `subject`: Subject of the email.
     - `html_content`: HTML content of the email body.
     - `files`: Attachments (optional).

6. **EmailOperator Example**  
   In this example, the `EmailOperator` sends an email with a sales report attached, once the task is completed.

   ```python
   from airflow.operators.email import EmailOperator

   task = EmailOperator(
       task_id='send_email',
       to='recipient@example.com',
       subject='Sales Report',
       html_content='Please find the sales report attached.',
       files=['/path/to/latest_sales.xlsx'],
       dag=dag
   )
   ```

---

### Common Command-Line Operations for These Operators

- **List all DAGs**: `airflow dags list`
- **Trigger a DAG**: `airflow dags trigger <dag_id>`
- **List task instances in a DAG**: `airflow tasks list <dag_id>`
- **Check task status**: `airflow tasks state <dag_id> <task_id> <execution_date>`
- **View DAG logs**: `airflow logs <dag_id> <task_id> <execution_date>`


### Additional Operators in Airflow

1. **PythonOperator**  
   The `PythonOperator` is used to execute a Python function or callable method as part of an Airflow task. It requires the `task_id`, `dag`, and `python_callable` arguments, with the `python_callable` being the function you wish to run. Arguments can be passed to the function using `op_args` (for positional arguments) or `op_kwargs` (for keyword arguments).

   - **Key Arguments**:
     - `task_id`: Unique identifier for the task.
     - `dag`: The DAG this task belongs to.
     - `python_callable`: The Python function to execute.
     - `op_args`: Positional arguments passed to the function.
     - `op_kwargs`: Keyword arguments passed to the function.

2. **PythonOperator Example**  
   An example shows how to use the `PythonOperator` to run a simple function `printme`, which writes a message to the task logs.

   ```python
   from airflow.operators.python import PythonOperator

   def printme():
       print("Hello from PythonOperator!")

   task = PythonOperator(
       task_id='python_task',
       python_callable=printme,
       dag=dag
   )
   ```

3. **Arguments in PythonOperator**  
   You can pass arguments to the function via `op_args` and `op_kwargs`. For example, using `op_kwargs`, you can pass named arguments to the function.

4. **op_kwargs Example**  
   The example defines a `sleep` function that accepts a `length_of_time` argument and then pauses for that amount of time. The `op_kwargs` dictionary is used to pass the `length_of_time` argument to the function.

   ```python
   from airflow.operators.python import PythonOperator
   import time

   def sleep(length_of_time):
       time.sleep(length_of_time)

   task = PythonOperator(
       task_id='sleep_task',
       python_callable=sleep,
       op_kwargs={'length_of_time': 5},
       dag=dag
   )
   ```

   - **Key Notes**: The dictionary keys must match the function's argument names. Incorrect keys can result in errors like `unexpected keyword argument`.

5. **EmailOperator**  
   The `EmailOperator` is used to send emails from within an Airflow task. This is useful for sending notifications, reports, etc. It requires the `task_id`, `to`, `subject`, and `html_content` arguments. The system must be configured with email server details for this to work.

   - **Key Arguments**:
     - `task_id`: Unique identifier for the task.
     - `to`: Recipient's email address.
     - `subject`: Subject of the email.
     - `html_content`: HTML content of the email body.
     - `files`: Attachments (optional).

6. **EmailOperator Example**  
   In this example, the `EmailOperator` sends an email with a sales report attached, once the task is completed.

   ```python
   from airflow.operators.email import EmailOperator

   task = EmailOperator(
       task_id='send_email',
       to='recipient@example.com',
       subject='Sales Report',
       html_content='Please find the sales report attached.',
       files=['/path/to/latest_sales.xlsx'],
       dag=dag
   )
   ```

---

### Common Command-Line Operations for These Operators

- **List all DAGs**: `airflow dags list`
- **Trigger a DAG**: `airflow dags trigger <dag_id>`
- **List task instances in a DAG**: `airflow tasks list <dag_id>`
- **Check task status**: `airflow tasks state <dag_id> <task_id> <execution_date>`
- **View DAG logs**: `airflow logs <dag_id> <task_id> <execution_date>`


### **Using the PythonOperator**
You've implemented several Airflow tasks using the BashOperator but realize that a couple of specific tasks would be better implemented using Python. You'll implement a task to download and save a file to the system within Airflow.

The requests library is imported for you, and the DAG process_sales_dag is already defined.

In [None]:
# Define the method
def pull_file(URL, savepath):
    r = requests.get(URL)
    with open(savepath, 'wb') as f:
        f.write(r.content)    
    # Use the print method for logging
    print(f"File pulled from {URL} and saved to {savepath}")

### **Using the PythonOperator**
You've implemented several Airflow tasks using the BashOperator but realize that a couple of specific tasks would be better implemented using Python. You'll implement a task to download and save a file to the system within Airflow.

The requests library is imported for you, and the DAG process_sales_dag is already defined.

In [None]:
def pull_file(URL, savepath):
    r = requests.get(URL)
    with open(savepath, 'wb') as f:
        f.write(r.content)   
    # Use the print method for logging
    print(f"File pulled from {URL} and saved to {savepath}")

from airflow.operators.python import PythonOperator

# Create the task
pull_file_task = PythonOperator(
    task_id='pull_file',
    # Add the callable
    python_callable=pull_file,
    # Define the arguments
    op_kwargs={'URL':'http://dataserver/sales.json', 'savepath':'latestsales.json'}
)

### **EmailOperator and dependencies**
Now that you've successfully defined the PythonOperators for your workflow, your manager would like to receive a copy of the parsed JSON file via email when the workflow completes. The previous tasks are still defined and the DAG process_sales_dag is configured. Please note that this task uses the older DAG definition method and is added for you.

In [None]:
# Import the Operator
from airflow.operators.email import EmailOperator

# Define the task
email_manager_task = EmailOperator(
    task_id='email_manager',
    to='manager@datacamp.com',
    subject='Latest sales JSON',
    html_content='Attached is the latest sales JSON file as requested.',
    files='parsedfile.json',
    dag=process_sales_dag
)

# Set the order of tasks
pull_file_task >> parse_file_task >> email_manager_task

### Summary of Key Points: Airflow Scheduling

1. **DAG Runs**  
   A DAG run refers to a specific execution instance of a workflow in Airflow. It can be triggered manually or via the schedule interval defined for the DAG. Each DAG run maintains its own state (running, failed, or success) and can have individual task states (queued, skipped, etc.).

2. **DAG Runs View**  
   In the Airflow UI, the **Browse: DAG Runs** menu option allows you to view all DAG runs. You can check the status and details of each DAG run in this view.

3. **DAG Run State**  
   The state of each DAG run is displayed, indicating whether the run was successful, failed, or had other states. This is useful for monitoring and troubleshooting DAGs.

4. **Schedule Details**  
   When scheduling a DAG, several parameters are important:
   - **Start Date**: The first possible time for the DAG to run (typically a Python `datetime` object).
   - **End Date**: The last possible time for the DAG to run.
   - **Max Tries**: The number of times Airflow should attempt to rerun the DAG before considering it as failed.
   - **Schedule Interval**: How often the DAG runs.

5. **Schedule Interval**  
   The schedule interval determines how frequently the DAG should run. This is not the exact time it will run, but the window in which it could be scheduled. The interval can be set using cron syntax or predefined presets.

6. **Cron Syntax**  
   Airflow supports cron-style syntax to define the schedule interval. This syntax includes five fields:
   - Minute (0-59)
   - Hour (0-23)
   - Day of the month (1-31)
   - Month (1-12)
   - Day of the week (0-6)
   
   Asterisks (*) represent all values in a field (e.g., `*` in the minute field means every minute). You can also specify a list of values (e.g., `0,15,30,45` for every 15 minutes).

7. **Cron Examples**  
   - `0 12 * * *`: Runs daily at Noon (12:00).
   - `25 2 * * *`: Runs once per minute on February 25th.
   - `0,15,30,45 * * * *`: Runs every 15 minutes.

8. **Airflow Scheduler Presets**  
   Airflow provides preset options for common time intervals:
   - `@hourly`: Equivalent to `0 * * * *` in cron, runs once an hour at the start of the hour.
   - `@daily`: Runs once a day at midnight.
   - `@weekly`: Runs once a week on Sunday at midnight.
   - `@monthly`: Runs once a month on the first day at midnight.
   - `@yearly`: Runs once a year on January 1st at midnight.

9. **Special Presets**  
   Airflow includes two special presets:
   - `None`: Means the DAG will not be scheduled, typically used for manually triggered workflows.
   - `@once`: Schedules the DAG to run only once.

10. **Schedule Interval Nuances**  
    A key scheduling nuance: when defining a **start_date**, Airflow will not schedule the first run until at least one schedule interval has passed after the start date. For example, if the start date is set to February 25, 2020, and the schedule interval is `@daily`, the first DAG run will be scheduled for February 26, 2020.

---

### Common Airflow Scheduling Commands

- **List all DAGs**: `airflow dags list`
- **Trigger a DAG**: `airflow dags trigger <dag_id>`
- **List DAG runs**: `airflow dags list-runs <dag_id>`
- **Check task state**: `airflow tasks state <dag_id> <task_id> <execution_date>`
- **View logs**: `airflow logs <dag_id> <task_id> <execution_date>`

Here is a neat table summarizing the key points related to **Airflow Scheduling**:

| **Topic**                     | **Details**                                                                                                            |
|-------------------------------|------------------------------------------------------------------------------------------------------------------------|
| **DAG Runs**                   | A DAG run refers to a specific execution instance of a workflow in Airflow. Can be triggered manually or via a schedule.  |
| **DAG Runs View**              | In the Airflow UI, under **Browse > DAG Runs**, you can view details and states of all DAG runs in the instance.        |
| **DAG Run State**              | Displays the state of each DAG run: running, failed, success, etc. Useful for monitoring and troubleshooting.           |
| **Schedule Details**           | Key parameters: <br> - **Start Date**: First possible time for DAG to run. <br> - **End Date**: Last possible time. <br> - **Max Tries**: Max retries before failure. <br> - **Schedule Interval**: Frequency of DAG runs. |
| **Schedule Interval**          | Defines how frequently the DAG runs. Can be set with cron syntax or predefined presets.                               |
| **Cron Syntax**                | Standard cron format: `minute hour day_of_month month day_of_week`. Asterisks (`*`) mean every possible value.          |
| **Cron Examples**              | - `0 12 * * *`: Run daily at Noon.<br> - `25 2 * * *`: Run on February 25th every minute.<br> - `0,15,30,45 * * * *`: Run every 15 minutes. |
| **Airflow Scheduler Presets**  | Predefined scheduling shortcuts:<br> - `@hourly`: Runs once every hour.<br> - `@daily`: Runs once per day.<br> - `@weekly`: Runs once per week.<br> - `@monthly`: Runs once per month.<br> - `@yearly`: Runs once a year. |
| **Special Presets**            | - `None`: No scheduling, used for manual execution.<br> - `@once`: Schedules the DAG to run only once.                   |
| **Schedule Interval Nuances**  | Airflow won’t schedule the first run until one schedule interval has passed after the **start_date**.                   |

### Common Airflow Scheduling Commands

| **Command**                           | **Description**                                                   |
|---------------------------------------|-------------------------------------------------------------------|
| `airflow dags list`                   | Lists all available DAGs in the Airflow instance.                 |
| `airflow dags trigger <dag_id>`       | Manually triggers a DAG run.                                      |
| `airflow dags list-runs <dag_id>`     | Lists the runs of a specific DAG.                                 |
| `airflow tasks state <dag_id> <task_id> <execution_date>` | Checks the state of a task for a specific DAG run.                |
| `airflow logs <dag_id> <task_id> <execution_date>` | Displays the logs for a specific task run in a DAG.               |

### **Schedule a DAG via Python**
You've learned quite a bit about creating DAGs, but now you would like to schedule a specific DAG on a specific day of the week at a certain time. You'd like the code include this information in case a colleague needs to reinstall the DAG to a different server.

The Airflow DAG object and the appropriate datetime methods have been imported for you.

In [None]:
# Update the scheduling arguments as defined
default_args = {
  'owner': 'Engineering',
  'start_date': datetime(2023, 11, 1),
  'email': ['airflowresults@datacamp.com'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 3,
  'retry_delay': timedelta(minutes=20)
}

dag = DAG('update_dataflows', default_args=default_args, schedule_interval='30 12 * * 3')

### **Deciphering Airflow schedules**
Given the various options for Airflow's schedule_interval, you'd like to verify that you understand exactly how intervals relate to each other, whether it's a cron format, timedelta object, or a preset.

Here’s a comprehensive table that highlights the use of `*` (asterisk) in various scheduling formats in Airflow, including Cron Syntax, Timedelta, and Presets.

### **Airflow Schedule Formats: The Role of `*` (Asterisk)**

| **Schedule Format** | **Description**                                           | **Example Usage**                     | **Explanation**                                                        |
|---------------------|-----------------------------------------------------------|---------------------------------------|------------------------------------------------------------------------|
| **Cron Syntax**      | The `*` in Cron syntax means "every possible value" for that field. | `* * * * *`                           | Run **every minute** of every hour, day, month, and week.              |
| **Minute Field**     | The first field (0-59) specifies the minute of the hour.     | `* 12 * * *`                          | Run **every minute** at **12 PM** each day.                            |
| **Hour Field**       | The second field (0-23) specifies the hour of the day.       | `0 * * * *`                           | Run **at the start of every hour** each day.                           |
| **Day of Month**     | The third field (1-31) specifies the day of the month.       | `0 0 * * *`                           | Run **at midnight** every day of the month.                            |
| **Month Field**      | The fourth field (1-12) specifies the month of the year.    | `0 0 * 5 *`                           | Run **at midnight** every day in **May**.                              |
| **Day of Week**      | The fifth field (0-6, 0=Sunday) specifies the day of the week. | `0 0 * * 0`                         | Run **at midnight every Sunday**.                                      |
| **Timedelta (days)** | In Timedelta, `*` is not used directly, but a value for a unit is specified. | `timedelta(days=1)`                  | Run **every day** (this is equivalent to a `@daily` preset).           |
| **Timedelta (hours)**| Specifies the number of hours between runs.                 | `timedelta(hours=1)`                  | Run **every hour**.                                                    |
| **Presets**          | The `*` in presets indicates an automatic, predefined schedule. | `@hourly`, `@daily`, `@weekly`, etc. | These are shortcuts for common schedules, and `*` is handled internally. |
| **@hourly**          | Run once **every hour**                                    | `@hourly`                             | Equivalent to `0 * * * *` (Run every hour at the start of the hour).   |
| **@daily**           | Run once **every day**                                     | `@daily`                              | Equivalent to `0 0 * * *` (Run at midnight every day).                |
| **@weekly**          | Run once **every week**                                    | `@weekly`                             | Equivalent to `0 0 * * 0` (Run at midnight on Sunday).                |
| **@monthly**         | Run once **every month**                                   | `@monthly`                            | Equivalent to `0 0 1 * *` (Run at midnight on the first of every month). |
| **@yearly**          | Run once **every year**                                    | `@yearly`                             | Equivalent to `0 0 1 1 *` (Run at midnight on January 1st).            |
| **None**             | No schedule, manual trigger only.                          | `None`                                | The DAG will **never** run automatically.                              |
| **@once**            | Run the DAG **once**, then stop scheduling.                | `@once`                               | The DAG will run only one time, not on a recurring schedule.           |

### **Explanation of `*` in Cron Syntax**

- **Minute Field**: `*` means "run every minute," which makes it highly flexible for tasks requiring frequent execution.
- **Hour Field**: `*` means "run every hour," ensuring tasks are executed every hour regardless of the specific time.
- **Day of Month**: `*` ensures the task runs every day of the month. This is useful for daily tasks that need to run at the same time each day.
- **Month Field**: `*` ensures the task runs every month. For example, `* * * 5 *` runs the task every day in May.
- **Day of Week**: `*` indicates the task should run on any day of the week, while specific numbers (e.g., `0` for Sunday) would restrict execution to a specific weekday.

### **Special Considerations**
- **Use of `*` in Presets**: For presets like `@hourly`, `@daily`, `@weekly`, etc., the `*` is handled internally and is not explicitly part of the user-defined syntax. These are shorthand expressions for common scheduling patterns.
- **Manual Scheduling with `None`**: If `schedule_interval` is set to `None`, the DAG will not have any automatic schedule. This is ideal for workflows that should only be triggered manually.

---

Would you like further clarification on any specific schedule format or example?