# Fixes in the Flow Definition

## BEFORE vs AFTER: Line-by-Line Comparison & Explanation

Here's a **line-by-line breakdown** of the changes made to the `main.py` (CrewAI Flow file), explaining **what was changed, line-by-line, and why** each change was necessary.

<br><br>
#### File Context

This script defines the CrewAI flow for an **automated email responder**, including:

* Checking for unread Gmail messages
* Passing them to a crew of agents
* Handling output
* Running on a timed loop

<br><br>
#### Imports

**Before:**

```python
import time
from typing import List
```

**After:**

```python
import time
import os
import json
from typing import List
```

**Why:**
* `os`: Needed to create the `output/` directory if it doesn’t exist.
* `json`: Used to serialize email data (`emails_json`) for structured agent input.


<br><br>
#### Function: `fetch_new_emails`

**❌ Before:**

```python
print("Kickoff the Email Filter Crew")
```

**✅ After:**

```python
print("🔄 Checking for new emails...")
```

**Why:**
More descriptive + clear for logs. Emoji makes log scanning easier in real-time or terminal playback.

---

**❌ Before:**

```python
self.state.emails = new_emails
self.state.checked_emails_ids = updated_checked_email_ids
```

**✅ After:**

```python
self.state.emails = new_emails
self.state.checked_emails_ids = updated_checked_email_ids
print(f"📥 Found {len(new_emails)} new email(s).")
return self.state
```

**Why:**

* Added logging of how many emails were found (helps understand batch size).
* `return self.state` is **essential in CrewAI flows** — it ensures the updated state is stored between steps. Missing this caused silent bugs.


<br><br>
#### Function: `generate_draft_responses`

**❌ Before:**

```python
print("Current email queue: ", len(self.state.emails))
```

**✅ After:**

```python
print("📨 Current email queue:", len(self.state.emails))
```

**Why:**
Improved clarity and terminal visibility. The emoji is for consistency and readability.

---

**❌ Before:**

```python
if len(self.state.emails) > 0:
    print("Writing New emails")
    emails = format_emails(self.state.emails)
    EmailFilterCrew().crew().kickoff(inputs={"emails": emails})
```

**✅ After:**

```python
if self.state.emails:
    print("✍️ Generating responses for new emails...")
    print(format_emails(self.state.emails))  # Optional: human-readable output

    try:
        emails_json = json.dumps(self.state.emails)
        result = EmailFilterCrew().crew().kickoff(inputs={"emails_json": emails_json})
```

**Why:**

* `if self.state.emails` is cleaner Python style.
* `format_emails()` call is retained but **only for console printing** (optional).
* **MAJOR FIX**: Converted emails to `emails_json` (serialized structured data).
* Passed `emails_json` to the Crew (critical for correct agent input parsing).

---

**✅ New:**

```python
print("✅ Crew output:")
print(result.raw)
```

**Why:**
To show full Crew-generated response text — whether it's categorization, drafted replies, or errors.

---

**✅ New Debug Block:**

```python
print("\n🔍 Debug Output Breakdown:\n")
try:
    for task_name, output in result.task_outputs.items():
        print(f"📌 Task '{task_name}' output:\n{output}\n")
except Exception as e:
    print("⚠️ Could not extract task-level output:", str(e))
```

**Why:**
Shows what each task agent produced (e.g., categorization list, action plan, drafts). This is invaluable for testing and debugging task-specific logic.

---

**✅ New: Save Crew output to file**

```python
os.makedirs("output", exist_ok=True)
with open("output/last_draft_output.txt", "w") as f:
    f.write(result.raw)
```

**Why:**
Creates a local snapshot of each run’s output. Helps with:

* Testing
* Debugging
* Reviewing what got generated
* Keeping history

---

**❌ Before:**

```python
self.state.emails = []
```

✅ Still present **after**, but now it's clearer it comes **after** the Crew output and saving step — logically placed after processing is complete.

---

<br><br>
#### Final Wait

**❌ Before:**

```python
print("Waiting for 180 seconds")
```

**✅ After:**

```python
print("🕒 Waiting for 180 seconds before next run...")
```

**Why:**
Same logging improvement rationale — better visual anchors in the terminal.


<br><br>
#### Function: `kickoff()`

**❌ Before:**

```python
email_auto_response_flow = EmailAutoResponderFlow()
email_auto_response_flow.kickoff()
```

**✅ After:**

Same, but prefixed by:

```python
print("🚀 Starting Email Auto-Responder Flow...")
```

**Why:**
Clarifies visually when the app launches. Helps distinguish between startup and cycle behavior in logs.


<br><br>
#### Summary of Improvements

* **Fixed key data mismatch bug**: Replaced text input with structured JSON for Crew task ingestion.
* **Improved logs**: Clearer step-by-step terminal output.
* **Added debugging**: Per-task insights.
* **Output persistence**: Saves last run to disk.
* **Better state handling**: Ensures proper return of state.
* **Emojis for readability**: Helpful for live demos, dev teams, and async debugging.

## About the 3 minute wait

If you run the application, you will see that the 3 minute wait is actually not working.

The reason is that the `generate_draft_responses` method is only **called once**, and then the script **terminates after sleeping**. To make the auto-responder truly **loop every 180 seconds**, you would need to explicitly **wrap the flow in a loop**.


#### Here's the modified version of `main.py` with the looping logic:

```python
#!/usr/bin/env python
import time
import os
import json
from typing import List

from crewai.flow.flow import Flow
from pydantic import BaseModel

from email_auto_responder_flow.types import Email
from email_auto_responder_flow.utils.emails import check_email, format_emails

from .crews.email_filter_crew.email_filter_crew import EmailFilterCrew


class AutoResponderState(BaseModel):
    emails: List[Email] = []
    checked_emails_ids: set[str] = set()


class EmailAutoResponderFlow(Flow[AutoResponderState]):
    initial_state = AutoResponderState

    def run_once(self):
        print("🔄 Checking for new emails...")
        new_emails, updated_checked_email_ids = check_email(
            checked_emails_ids=self.state.checked_emails_ids
        )

        self.state.emails = new_emails
        self.state.checked_emails_ids = updated_checked_email_ids
        print(f"📥 Found {len(new_emails)} new email(s).")

        print("📨 Current email queue:", len(self.state.emails))

        if self.state.emails:
            print("✍️ Generating responses for new emails...")
            print(format_emails(self.state.emails))  # Optional: human-readable output

            try:
                emails_json = json.dumps(self.state.emails)

                result = EmailFilterCrew().crew().kickoff(inputs={"emails_json": emails_json})

                print("✅ Crew output:")
                print(result.raw)

                print("\n🔍 Debug Output Breakdown:\n")
                try:
                    for task_name, output in result.task_outputs.items():
                        print(f"📌 Task '{task_name}' output:\n{output}\n")
                except Exception as e:
                    print("⚠️ Could not extract task-level output:", str(e))

                os.makedirs("output", exist_ok=True)
                with open("output/last_draft_output.txt", "w") as f:
                    f.write(result.raw)

            except Exception as e:
                print("❌ Error running the Email Filter Crew:", str(e))

            self.state.emails = []  # Clear processed emails
        else:
            print("⏳ No new emails to process.")


def kickoff():
    """Run the flow in an infinite loop with delays."""
    print("🚀 Starting Email Auto-Responder Flow...")
    flow = EmailAutoResponderFlow()

    while True:
        flow.run_once()
        print("🕒 Waiting for 180 seconds before next run...")
        time.sleep(180)


def plot_flow():
    """Plot the flow."""
    email_auto_response_flow = EmailAutoResponderFlow()
    email_auto_response_flow.plot()


if __name__ == "__main__":
    kickoff()
```

---

#### What has changed:

* ✅ Replaced `@start`/`@listen` with a single `run_once()` method for clarity and control.
* ✅ Wrapped the main execution in a `while True` loop inside `kickoff()`.
* ✅ Preserved state between loops so `checked_emails_ids` grows over time.
* ✅ Ensures **actual polling behavior** every 180 seconds.

## Let's explain this with more detail

Replacing `@start`/`@listen` with a custom `run_once()` method is a **practical and clean solution** *when you're managing a custom loop manually*, **but only if you're not using CrewAI's built-in flow execution system** like `flow.kickoff()` or `flow.plot()` to orchestrate steps automatically.


#### What `@start` and `@listen` Are For:

* These decorators define **flow graphs** declaratively for CrewAI.
* `@start` marks an entry point.
* `@listen(func)` wires a task to run **after** the decorated one completes.
* When you call `.kickoff()`, CrewAI runs the full pipeline **once** based on that flow graph.

#### Why It Doesn’t Loop:

The original script used `@start` and `@listen`, then called `flow.kickoff()` — which:

* **Runs the flow once**
* Then returns (hence why your terminal shows it's done)

CrewAI **does not manage repetition** or long-running polling — that's up to **you** to implement.

#### Why `run_once()` is a Good Idea Here:

You're building a **long-running, looping** system (like a background email watcher), so:

* You **don’t want to re-initialize** the flow graph each time.
* You need to **retain state** across runs (e.g., `checked_emails_ids`).
* You need precise control over **when to run**, **when to pause**, and **how to handle failures**.

By collapsing the flow into a single `run_once()` method and calling it in a loop, you:

* **Gain full control** over polling intervals and logic.
* Keep things **simple and testable**.
* Still fully use CrewAI (via `EmailFilterCrew().crew().kickoff(...)`) for agent task execution.

#### Recommendation for Production:

If you want to retain the **graph-structured benefits** of `@start`/`@listen`:

* Consider using `flow.kickoff()` **inside** the loop:

```python
flow = EmailAutoResponderFlow()
while True:
    flow.kickoff()  # executes start → listen chain
    print("Sleeping...")
    time.sleep(180)
```

However, this **rebuilds flow state each time**, which may be inefficient or error-prone for stateful flows like yours.

####  Conclusion:

> Replacing `@start`/`@listen` with `run_once()` is **appropriate and better** in a polling-based automation scenario like yours, especially for production-like robustness and repeatability.