<a href="https://colab.research.google.com/github/ChintPatel/CMPE258-HW7/blob/main/Crew_AI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install --quiet google-genai crewai pydantic

import os
from getpass import getpass

import google.genai as genai  # free-tier Gemini
from crewai.flow.flow import Flow, start, listen, router
from pydantic import BaseModel


[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.8/42.8 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.3/67.3 kB[0m [31m827.7 kB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m48.2/48.2 kB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.0/310.0 kB[0m [31m19.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.7/7.7 MB[0m [31m85.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m135.3/135.3 kB[0m [31m12.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m18.9/18.9 MB[0m [31m77.9 MB/s[0m eta 

In [2]:
# —– set your Gemini API key (free tier) ——
if not os.environ.get("GEMINI_API_KEY"):
    os.environ["GEMINI_API_KEY"] = getpass("GEMINI_API_KEY: ")

client = genai.Client(api_key=os.environ["GEMINI_API_KEY"])
def call_gemini(prompt: str) -> str:
    resp = client.models.generate_content(
        model="gemini-2.0-flash",
        contents=prompt
    )
    return resp.text


GEMINI_API_KEY: ··········


▶️ Cell 3: Prompt-Chaining Flow

In [11]:
# Colab ▶️ Cell 3: Prompt-Chaining Flow (with kickoff(topic))

from pydantic import BaseModel
from crewai.flow.flow import Flow, start, listen, router

class JokeState(BaseModel):
    topic:    str = ""
    joke:     str = ""
    improved: str = ""

class JokeFlow(Flow[JokeState]):
    @start()
    def kickoff(self, topic: str):
        """Entry point: accepts topic as an argument."""
        self.state.topic = topic
        return self.gen_joke

    @listen(kickoff)
    def gen_joke(self):
        self.state.joke = call_gemini(f"Write a short joke about {self.state.topic}")
        return self.check_punchline

    @router(gen_joke)
    def check_punchline(self):
        # if it already has a punchline, end the flow
        if any(p in self.state.joke for p in ("?", "!")):
            return None
        # otherwise, go improve it
        return self.improve_joke

    @listen(check_punchline)
    def improve_joke(self):
        self.state.improved = call_gemini(f"Make this joke funnier: {self.state.joke}")
        # returning the improved joke causes kickoff() to return it
        return self.state.improved


▶️ Cell 4: Run the Prompt-Chaining Flow

In [13]:
# Colab ▶️ Cell 4: Run Prompt-Chaining Flow

flow1 = JokeFlow()
joke = flow1.kickoff("cats")
print("Joke:", joke)


Joke: <bound method JokeFlow.gen_joke of <__main__.JokeFlow object at 0x782962b57290>>


▶️ Cell 5: Parallelization Flow

In [20]:
# Colab ▶️ Cell 5: Parallelization Flow (fixed @listen)

from pydantic import BaseModel
from crewai.flow.flow import Flow, start, listen

class ParState(BaseModel):
    topic:    str = ""
    joke:     str = ""
    story:    str = ""
    poem:     str = ""
    combined: str = ""

class ParFlow(Flow[ParState]):
    @start()
    def kickoff(self, topic: str):
        """Accepts `topic` and fans out to three generators."""
        self.state.topic = topic
        return [self.make_joke, self.make_story, self.make_poem]

    @listen(kickoff)
    def make_joke(self):
        self.state.joke = call_gemini(f"Write a joke about {self.state.topic}")

    @listen(kickoff)
    def make_story(self):
        self.state.story = call_gemini(f"Write a short story about {self.state.topic}")

    @listen(kickoff)
    def make_poem(self):
        self.state.poem = call_gemini(f"Write a poem about {self.state.topic}")

    # Fan-in: listen to each of the three; only return once all are set
    @listen(make_joke)
    @listen(make_story)
    @listen(make_poem)
    def combine(self):
        if self.state.joke and self.state.story and self.state.poem:
            combined = (
                f"JOKE:\n{self.state.joke}\n\n"
                f"STORY:\n{self.state.story}\n\n"
                f"POEM:\n{self.state.poem}"
            )
            self.state.combined = combined
            return combined


▶️ Cell 6: Run the Parallelization Flow


In [21]:
# Colab ▶️ Cell 6: Run Parallelization Flow

flow2 = ParFlow()
combined_output = flow2.kickoff("cats")
print("Combined output:\n", combined_output)


Combined output:
 [<bound method ParFlow.make_joke of <__main__.ParFlow object at 0x782962b7cb10>>, <bound method ParFlow.make_story of <__main__.ParFlow object at 0x782962b7cb10>>, <bound method ParFlow.make_poem of <__main__.ParFlow object at 0x782962b7cb10>>]
