📘 1. Environment Setup
<details> <summary>🐚 Install Airflow & Dependencies</summary>

In [1]:
!python3 -m venv airflow_env
!source airflow_env/bin/activate

Error: Command '['/content/airflow_env/bin/python3', '-m', 'ensurepip', '--upgrade', '--default-pip']' returned non-zero exit status 1.
/bin/bash: line 1: airflow_env/bin/activate: No such file or directory


In [8]:
# prompt: Install these together so that airflow  works :
# # 1. Pin to a known Airflow version
# !pip install --upgrade "apache-airflow>=2.8.1"
#     # --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.7.1/constraints-3.10.txt"
# # 2. Extras: yfinance, ML stack
# !pip install yfinance torch transformers datasets pandas

# 📘 1. Environment Setup
# <details> <summary>🐚 Install Airflow & Dependencies</summary>
!python3 -m venv airflow_env
!source airflow_env/bin/activate
# 1. Pin to a known Airflow version
!pip install --upgrade "apache-airflow>=2.8.1"
    # --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.7.1/constraints-3.10.txt"
# 2. Extras: yfinance, ML stack
!pip install yfinance torch transformers datasets pandas

Error: Command '['/content/airflow_env/bin/python3', '-m', 'ensurepip', '--upgrade', '--default-pip']' returned non-zero exit status 1.
/bin/bash: line 1: airflow_env/bin/activate: No such file or directory


In [1]:
# 1. Pin to a known Airflow version
!pip install --upgrade "apache-airflow>=2.8.1"
    # --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.7.1/constraints-3.10.txt"

# 2. Extras: yfinance, ML stack
!pip install yfinance torch transformers datasets pandas




</details> <details> <summary>⚙️ Initialize Airflow</summary>

In [7]:
# Set AIRFLOW_HOME
!export PATH=$PATH:~/.local/bin

import os
os.environ["AIRFLOW_HOME"] = "/content/airflow"

# Initialize metadata DB
!airflow db init

# Create DAG folder
!mkdir -p /content/airflow/dags


/bin/bash: line 1: airflow: command not found


# What Is Apache Airflow?

Apache Airflow is an **open-source workflow orchestration platform** designed to programmatically author, schedule, and monitor complex data pipelines. At its core, Airflow lets you define your workflows as **Directed Acyclic Graphs (DAGs)** in Python—so you get the full power of a general-purpose language to parameterize, modularize, and test your pipelines.

---

## Core Components & Architecture

- **DAGs**  
  Your pipelines, defined as Python scripts, where each node is a task (e.g., SQL query, Python function, Bash script) and edges express dependencies.

- **Scheduler**  
  Reads your DAG definitions and figures out *when* each task should run.

- **Executor**  
  Determines *how* to run tasks. Options include the LocalExecutor, CeleryExecutor, KubernetesExecutor, etc.

- **Metadata Database**  
  A SQL store (e.g., Postgres, MySQL) that tracks every DAG run, task instance, log location, and state transition.

- **Workers / Pods**  
  The actual machines or containers that execute your tasks.

---

## How It Works (DAG Lifecycle)

1. **Write a DAG file** in your `$AIRFLOW_HOME/dags` folder.  
2. **Scheduler** continually parses these files, writing scheduled runs into the metadata database.  
3. **Executor** picks up ready tasks and dispatches them to Workers (or Pods).  
4. **Workers** execute task code, update the database with success/failure, push logs back to storage.  
5. **Web UI** and CLI let you monitor, backfill historical data, trigger ad-hoc runs, and inspect logs.  

---

## Real-World Use Case: Dynamic Data Acquisition

Imagine an IoT scenario where devices stream data into AWS Kinesis. You need to:

1. **Ingest** raw events into S3  
2. **Clean** and normalize streaming batches  
3. **Load** into a data warehouse (e.g., Redshift)  
4. **Trigger** ML jobs to detect anomalies  

Airflow can orchestrate this multi-stage flow, automatically retrying failures, tracking lineage, and scaling workers up/down on Kubernetes.

---

## Scaling & Executors

- **CeleryExecutor + Redis/RabbitMQ**  
  Distribute tasks across many worker nodes—great for parallel ETL jobs.

- **KubernetesExecutor**  
  Spin up one pod per task, ensuring full isolation and horizontal scaling.

- **LocalExecutor**  
  Good for single-machine setups or development.

---

## Hello-World DAG

Below is a minimal, **manually-triggered** DAG that prints the date and then “Hello, Airflow!” When you’re ready to run:

```bash
airflow dags trigger hello_manual


🏷️ 2. Simple “Hello World” DAG (Manual Trigger)
<details> <summary>📝 Explanation</summary>
Purpose: Show the bare minimum DAG with two Bash tasks.

Manual Trigger: We set schedule_interval=None, so it never runs on its own.

How to run: After saving the file, use airflow dags list to confirm it appears, then:

```bash
airflow dags trigger hello_manual
```

```bash
airflow tasks run hello_manual print_date <EXEC_DATE>
```

</details>

In [None]:
%%writedag /content/airflow/dags/hello_manual.py
# File: /content/airflow/dags/hello_manual.py

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2025, 6, 16),
    'email_on_failure': False,
    'retries': 0,
}

with DAG(
    dag_id='hello_manual',
    default_args=default_args,
    description='A manually-triggered Hello World DAG',
    schedule_interval=None,    # <-- no automatic scheduling
    catchup=False,
) as dag:

    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

    t2 = BashOperator(
        task_id='say_hello',
        bash_command='echo "Hello, Airflow (Manual)!"',
    )

    t1 >> t2


# Breaking Down the Hello-World Airflow DAG

Below is a **line-by-line explanation** of the simple, manually-triggered DAG. We’ll walk through each section so you can see exactly how Airflow turns Python code into a scheduled workflow.

---

## 1. Imports & Prerequisites

```python
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
```

- **`datetime`, `timedelta`** (from Python’s standard library) let us specify when the DAG should start, and control retry delays.  
- **`DAG`** is the class you use to define a workflow.  
- **`BashOperator`** is a built-in Airflow task that runs a shell command.

---

## 2. Defining Default Arguments

```python
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2025, 6, 16),
    'email_on_failure': False,
    'retries': 0,
}
```

These settings apply to every task in the DAG:

- **`owner`**: Who “owns” these tasks (for logging/notifications).  
- **`depends_on_past=False`**: Don’t require previous runs of the same task to succeed before running.  
- **`start_date`**: The date after which Airflow will allow runs.  
- **`email_on_failure=False`**: Skip email alerts on failure (you could turn this on).  
- **`retries=0`**: Don’t retry a failed task automatically.

---

## 3. Creating the DAG Context

```python
with DAG(
    dag_id='hello_manual',
    default_args=default_args,
    description='A manually-triggered Hello World DAG',
    schedule_interval=None,    # <-- no automatic scheduling
    catchup=False,
) as dag:
    ...
```

- **`dag_id`**: A unique identifier for your workflow.  
- **`default_args`**: The dict we just defined, applied to all tasks.  
- **`description`**: A human-readable summary.  
- **`schedule_interval=None`**: This DAG only runs when you manually trigger it—no cron schedule.  
- **`catchup=False`**: Don’t run any “missed” backfill jobs if the scheduler is down.

Everything indented under this `with` block belongs to this DAG.

---

## 4. Defining Tasks

### Task 1: Print the Date

```python
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
)
```

- **`task_id`**: A unique name for this step.  
- **`bash_command='date'`**: Runs the UNIX `date` command, printing the current date/time.

### Task 2: Say Hello

```python
t2 = BashOperator(
    task_id='say_hello',
    bash_command='echo "Hello, Airflow (Manual)!"',
)
```

- **`bash_command='echo ...'`**: Prints our greeting to the logs.

---

## 5. Setting Task Order

```python
t1 >> t2
```

- The **`>>`** operator expresses a dependency: **`t2`** will only run after **`t1`** completes successfully.

---

## 6. Summary of Execution

1. **Trigger** the DAG manually (e.g., `airflow dags trigger hello_manual`).  
2. **Airflow Scheduler** marks the DAG run as “queued.”  
3. **Executor** dispatches **`t1`**, which prints the date.  
4. On success, **`t2`** runs, echoing “Hello, Airflow (Manual)!”  
5. **Logs** for each task appear in the Web UI and on disk.

---

You’ve now defined and understood your first Airflow DAG! From here, you can swap in other Operators (Python, SQL, Docker, etc.) and add more tasks to build real-world pipelines.


▶️ How to Trigger & Inspect

In [None]:
# 1. List available DAGs
!airflow dags list

# 2. Trigger the Hello DAG by its ID
!airflow dags trigger hello_manual

# 3. Watch its progress in the UI (http://localhost:8080), or fetch logs:
!airflow tasks logs hello_manual print_date 2025-06-16T00:00:00+00:00


📈 3. Stock-LSTM Pipeline DAG (Manual Trigger)
<details> <summary>📝 Explanation</summary>
This DAG orchestrates a simple ML pipeline:

fetch_history → download 1 year of AAPL data

train_lstm → prepare 2-day windows, train an LSTM via HF Trainer

fetch_recent → grab today’s & yesterday’s closing prices

predict_next → load the saved model and output “UP”/“DOWN”

By setting schedule_interval=None, you only run it when you’re ready—ideal for demos or debugging.

To run:

```bash
airflow dags trigger stock_manual
```

</details>

In [None]:
# File: /content/airflow/dags/stock_manual.py

import os
from datetime import datetime, timedelta

import yfinance as yf
import pandas as pd
import torch
import torch.nn as nn
from datasets import Dataset
from transformers import Trainer, TrainingArguments

from airflow import DAG
from airflow.operators.python import PythonOperator

# ─── Config ─────────────────────────────────────────────
SYMBOL    = "AAPL"
BASE_DIR  = "/content/stock_manual"
MODEL_DIR = os.path.join(BASE_DIR, "model")
os.makedirs(MODEL_DIR, exist_ok=True)

# ─── Task 1: Fetch History ───────────────────────────────
def fetch_history(**kwargs):
    end   = datetime.now()
    start = end - timedelta(days=365)
    df    = yf.download(SYMBOL, start=start, end=end)
    df.to_csv(f"{BASE_DIR}/{SYMBOL}_history.csv")
    print("History rows:", len(df))

# ─── Task 2: Train LSTM ──────────────────────────────────
class LSTMClassifier(nn.Module):
    def __init__(self, hid=32):
        super().__init__()
        self.lstm = nn.LSTM(1, hid, batch_first=True)
        self.fc   = nn.Linear(hid, 2)
    def forward(self, x):
        _, (h, _) = self.lstm(x)
        return self.fc(h[-1])

def train_lstm(**kwargs):
    df = pd.read_csv(f"{BASE_DIR}/{SYMBOL}_history.csv", parse_dates=["Date"])
    closes = df["Close"].values
    seqs, labels = [], []
    for i in range(len(closes)-2):
        seqs.append([[closes[i]], [closes[i+1]]])
        labels.append(int(closes[i+2] > closes[i+1]))
    ds = Dataset.from_dict({"sequence": seqs, "label": labels}).train_test_split(0.1)

    def collate(batch):
        X = torch.tensor([b["sequence"] for b in batch], dtype=torch.float)
        y = torch.tensor([b["label"]    for b in batch], dtype=torch.long)
        return {"input_values": X, "labels": y}

    model = LSTMClassifier()
    args = TrainingArguments(
        output_dir=MODEL_DIR,
        num_train_epochs=3,
        per_device_train_batch_size=16,
        evaluation_strategy="epoch",
        save_total_limit=1,
    )
    Trainer(
        model=model,
        args=args,
        train_dataset=ds["train"],
        eval_dataset =ds["test"],
        data_collator=collate,
    ).train()
    model_path = os.path.join(MODEL_DIR, "pytorch_model.bin")
    torch.save(model.state_dict(), model_path)
    print("Saved model to", model_path)

# ─── Task 3: Fetch Recent ────────────────────────────────
def fetch_recent(**kwargs):
    end   = datetime.now()
    start = end - timedelta(days=5)
    df = yf.download(SYMBOL, start=start, end=end)["Close"].tail(2)
    df.to_csv(f"{BASE_DIR}/{SYMBOL}_recent.csv", index=False, header=False)
    print("Recent closes:", df.values.flatten())

# ─── Task 4: Predict Next ────────────────────────────────
def predict_next(**kwargs):
    recent = pd.read_csv(f"{BASE_DIR}/{SYMBOL}_recent.csv", header=None).values.flatten()
    seq = torch.tensor([[[recent[0]], [recent[1]]]], dtype=torch.float)

    model = LSTMClassifier()
    model.load_state_dict(torch.load(os.path.join(MODEL_DIR, "pytorch_model.bin")))
    model.eval()

    pred = torch.argmax(model(seq), dim=-1).item()
    out  = "UP" if pred==1 else "DOWN"
    print("Next-day movement →", out)

    with open(f"{BASE_DIR}/predictions.log", "a") as f:
        f.write(f"{datetime.now().date()} → {out}\n")

# ─── DAG Definition ─────────────────────────────────────
default_args = {
    "owner": "airflow",
    "start_date": datetime(2025, 6, 16),
    "retries": 0,
}

with DAG(
    dag_id="stock_manual",
    default_args=default_args,
    description="Manually-triggered Stock-LSTM pipeline",
    schedule_interval=None,  # <-- manual only
    catchup=False,
) as dag:

    t1 = PythonOperator(task_id="fetch_history", python_callable=fetch_history)
    t2 = PythonOperator(task_id="train_lstm",    python_callable=train_lstm)
    t3 = PythonOperator(task_id="fetch_recent",  python_callable=fetch_recent)
    t4 = PythonOperator(task_id="predict_next",  python_callable=predict_next)

    t1 >> t2 >> t3 >> t4


# Breaking Down the Stock-LSTM Pipeline DAG

Below is a **section-by-section explanation** of the `stock_manual.py` DAG. This manually-triggered pipeline fetches stock history, trains an LSTM model, and predicts next-day movement.

---

## 1. Imports & Dependencies

```python
import os
from datetime import datetime, timedelta

import yfinance as yf
import pandas as pd
import torch
import torch.nn as nn
from datasets import Dataset
from transformers import Trainer, TrainingArguments

from airflow import DAG
from airflow.operators.python import PythonOperator
```

- **OS & datetime**: File operations and scheduling windows.  
- **`yfinance`, `pandas`**: Download and manipulate stock data.  
- **`torch`, `nn`**: Define and train the LSTM model.  
- **Hugging Face `datasets` & `transformers`**: Create a dataset and streamline training.  
- **Airflow imports**: `DAG` for workflow, `PythonOperator` for running Python tasks.

---

## 2. Configuration

```python
SYMBOL    = "AAPL"
BASE_DIR  = "/content/stock_manual"
MODEL_DIR = os.path.join(BASE_DIR, "model")
os.makedirs(MODEL_DIR, exist_ok=True)
```

- **`SYMBOL`**: Ticker to analyze (e.g., AAPL).  
- **`BASE_DIR`**: Root folder for data, model, and logs.  
- **`MODEL_DIR`**: Subfolder for saving model files.  
- **`os.makedirs`**: Ensure the directory exists before writing.

---

## 3. Task 1: `fetch_history`

```python
def fetch_history(**kwargs):
    end   = datetime.now()
    start = end - timedelta(days=365)
    df    = yf.download(SYMBOL, start=start, end=end)
    df.to_csv(f"{BASE_DIR}/{SYMBOL}_history.csv")
    print("History rows:", len(df))
```

- **Purpose**: Download one year of historical closing prices.  
- **Window**: From today back 365 days.  
- **Output**: CSV saved at `SYMBOL_history.csv`.  
- **Print**: Number of rows to verify download.

---

## 4. Task 2: Define & Train LSTM

### 4.1 LSTM Model Definition

```python
class LSTMClassifier(nn.Module):
    def __init__(self, hid=32):
        super().__init__()
        self.lstm = nn.LSTM(1, hid, batch_first=True)
        self.fc   = nn.Linear(hid, 2)
    def forward(self, x):
        _, (h, _) = self.lstm(x)
        return self.fc(h[-1])
```

- **`hid`**: Hidden dimension size.  
- **Architecture**:  
  - LSTM layer reading one feature (the price) over `seq_len=2`.  
  - Fully-connected layer mapping hidden state to 2 classes (UP or DOWN).

### 4.2 Data Preparation & Training

```python
def train_lstm(**kwargs):
    df = pd.read_csv(f"{BASE_DIR}/{SYMBOL}_history.csv", parse_dates=["Date"])
    closes = df["Close"].values
    seqs, labels = [], []
    for i in range(len(closes)-2):
        seqs.append([[closes[i]], [closes[i+1]]])
        labels.append(int(closes[i+2] > closes[i+1]))
    ds = Dataset.from_dict({"sequence": seqs, "label": labels}).train_test_split(0.1)
```

- **Sequence & Label**:  
  - **Input**: Two-day window of closing prices.  
  - **Label**: 1 if next-day price goes up, else 0.
- **`Dataset`**: Wrap sequences for Hugging Face training API.

```python
    def collate(batch):
        X = torch.tensor([b["sequence"] for b in batch], dtype=torch.float)
        y = torch.tensor([b["label"]    for b in batch], dtype=torch.long)
        return {"input_values": X, "labels": y}

    model = LSTMClassifier()
    args = TrainingArguments(
        output_dir=MODEL_DIR,
        num_train_epochs=3,
        per_device_train_batch_size=16,
        evaluation_strategy="epoch",
        save_total_limit=1,
    )
    Trainer(
        model=model,
        args=args,
        train_dataset=ds["train"],
        eval_dataset=ds["test"],
        data_collator=collate,
    ).train()
    torch.save(model.state_dict(), os.path.join(MODEL_DIR, "pytorch_model.bin"))
```

- **Collator**: Converts list of examples into tensors.  
- **`Trainer`**: Simplifies training loop, evaluation, and checkpointing.  
- **Save**: Model weights for inference later.

---

## 5. Task 3: `fetch_recent`

```python
def fetch_recent(**kwargs):
    end   = datetime.now()
    start = end - timedelta(days=5)
    df = yf.download(SYMBOL, start=start, end=end)["Close"].tail(2)
    df.to_csv(f"{BASE_DIR}/{SYMBOL}_recent.csv", index=False, header=False)
    print("Recent closes:", df.values.flatten())
```

- **Purpose**: Get the last two available closing prices for prediction.  
- **Window**: Last 5 days to account for weekends/holidays.  
- **Output**: Simple CSV with two values.

---

## 6. Task 4: `predict_next`

```python
def predict_next(**kwargs):
    recent = pd.read_csv(f"{BASE_DIR}/{SYMBOL}_recent.csv", header=None).values.flatten()
    seq = torch.tensor([[[recent[0]], [recent[1]]]], dtype=torch.float)

    model = LSTMClassifier()
    model.load_state_dict(torch.load(os.path.join(MODEL_DIR, "pytorch_model.bin")))
    model.eval()

    pred = torch.argmax(model(seq), dim=-1).item()
    out  = "UP" if pred==1 else "DOWN"
    print("Next-day movement →", out)

    with open(f"{BASE_DIR}/predictions.log", "a") as f:
        f.write(f"{datetime.now().date()} → {out}\n")
```

- **Load**: Recent prices and reshape for model input.  
- **Inference**: Model predicts UP or DOWN.  
- **Log**: Append prediction with date to `predictions.log`.

---

## 7. DAG Definition

```python
default_args = {
    "owner": "airflow",
    "start_date": datetime(2025, 6, 16),
    "retries": 0,
}

with DAG(
    dag_id="stock_manual",
    default_args=default_args,
    description="Manually-triggered Stock-LSTM pipeline",
    schedule_interval=None,  # Manual only
    catchup=False,
) as dag:

    t1 = PythonOperator(task_id="fetch_history", python_callable=fetch_history)
    t2 = PythonOperator(task_id="train_lstm",    python_callable=train_lstm)
    t3 = PythonOperator(task_id="fetch_recent",  python_callable=fetch_recent)
    t4 = PythonOperator(task_id="predict_next",  python_callable=predict_next)

    t1 >> t2 >> t3 >> t4
```

- **`dag_id`**: Unique name for this pipeline.  
- **Tasks**: Four Python callables executed in order.  
- **Manual Trigger**: `schedule_interval=None` means run only when triggered by user.  
- **Dependencies**: Ensures each stage completes before the next starts.

---

You now have a clear, step-by-step guide to how each section of the Stock-LSTM DAG works!

▶️ Trigger & Validate

In [None]:
# List and trigger the stock pipeline
!airflow dags list
!airflow dags trigger stock_manual

# Tail the prediction log after it completes
!sleep 10
!tail -n 5 /content/stock_manual/predictions.log
