<a href="https://colab.research.google.com/github/LuniaKunal/CrewAI-Flow-Setup/blob/main/CrewAI_Flow_Setup.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
!pip install crewai



In [4]:
from google.colab import userdata


In [5]:
GROQ_API_KEY=userdata.get('GROQ_API_KEY')

In [6]:
from crewai.flow.flow import Flow, listen, start
from dotenv import load_dotenv
from litellm import completion
import nest_asyncio # Import nest_asyncio

nest_asyncio.apply() # Apply nest_asyncio patch

class ExampleFlow(Flow):
    model = "groq/llama-3.3-70b-versatile"

    @start()
    def generate_city(self):
        print("Starting flow")
        # Each flow state automatically gets a unique ID
        print(f"Flow State ID: {self.state['id']}")

        response = completion(
            model=self.model,
            api_key=GROQ_API_KEY,
            messages=[
                {
                    "role": "user",
                    "content": "Return the name of a random city in the world.",
                },
            ],
        )

        random_city = response["choices"][0]["message"]["content"]
        # Store the city in our state
        self.state["city"] = random_city
        print(f"Random City: {random_city}")

        return random_city

    @listen(generate_city)
    def generate_fun_fact(self, random_city):
        response = completion(
            model=self.model,
            api_key=GROQ_API_KEY,
            messages=[
                {
                    "role": "user",
                    "content": f"Tell me a fun fact about {random_city}",
                },
            ],
        )

        fun_fact = response["choices"][0]["message"]["content"]
        # Store the fun fact in our state
        self.state["fun_fact"] = fun_fact
        return fun_fact



flow = ExampleFlow()
result = flow.kickoff()

flow.plot("my_flow_plot")

print(f"Generated fun fact: {result}")

[1m[35m Flow started with ID: a33864e4-5f70-4c7e-97af-4950de3db073[00m


Starting flow
Flow State ID: a33864e4-5f70-4c7e-97af-4950de3db073
Random City: Skopje.


Plot saved as my_flow_plot.html
Generated fun fact: A fun fact about Skopje, the capital of North Macedonia, is that it has a plethora of statues throughout the city. In fact, Skopje is often referred to as the "City of Statues" due to its numerous and sometimes quirky monuments. The city has over 150 statues, including those of famous historical figures, mythical creatures, and even a giant statue of Alexander the Great. This makes Skopje a unique and fascinating place to explore, with a blend of history, culture, and whimsy around every corner.


In [7]:
# prompt: Show image of html file with path -> /content/my_flow_plot.html

from IPython.display import HTML

HTML(filename='/content/my_flow_plot.html')


In [8]:
from crewai.flow.flow import Flow, listen, start

class OutputExampleFlow(Flow):
    @start()
    def first_method(self):
        return "Output from first_method"

    @listen(first_method)
    def second_method(self, first_output):
        return f"Second method received: {first_output}"


flow = OutputExampleFlow()
final_output = flow.kickoff()

flow.plot()

print("---- Final Output ----")
print(final_output)

[1m[35m Flow started with ID: 9a6d06ad-b9cd-4f92-ab12-f3cbbde6f7d9[00m


Plot saved as crewai_flow.html
---- Final Output ----
Second method received: Output from first_method


In [9]:
# prompt: Show image of html file with path -> /content/my_flow_plot.html

from IPython.display import HTML

HTML(filename='/content/crewai_flow.html')


In [10]:
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel

class ExampleState(BaseModel):
    counter: int = 0
    message: str = ""

class StateExampleFlow(Flow[ExampleState]):

    @start()
    def first_method(self):
        self.state.message = "Hello from first_method"
        self.state.counter += 1

    @listen(first_method)
    def second_method(self):
        self.state.message += " - updated by second_method"
        self.state.counter += 1
        return self.state.message

flow = StateExampleFlow()
final_output = flow.kickoff()

flow.plot()

print(f"Final Output: {final_output}")
print("Final State:")
print(flow.state)

[1m[35m Flow started with ID: d720f541-fb92-43c6-a505-b13c66bdcc64[00m


Plot saved as crewai_flow.html
Final Output: Hello from first_method - updated by second_method
Final State:
id='d720f541-fb92-43c6-a505-b13c66bdcc64' counter=2 message='Hello from first_method - updated by second_method'


In [11]:
# prompt: Show image of html file with path -> /content/my_flow_plot.html

from IPython.display import HTML

HTML(filename='/content/crewai_flow.html')


In [12]:
from crewai.flow.flow import Flow, listen, start

class UnstructuredExampleFlow(Flow):

    @start()
    def first_method(self):
        # The state automatically includes an 'id' field
        print(f"State ID: {self.state['id']}")
        self.state['counter'] = 0
        self.state['message'] = "Hello from structured flow"

    @listen(first_method)
    def second_method(self):
        self.state['counter'] += 1
        self.state['message'] += " - updated"

    @listen(second_method)
    def third_method(self):
        self.state['counter'] += 1
        self.state['message'] += " - updated again"

        print(f"State after third_method: {self.state}")


flow = UnstructuredExampleFlow()
flow.kickoff()
flow.plot("Three_methods")

[1m[35m Flow started with ID: fe8cb324-d4a0-4f02-8551-a4161d09e95e[00m


State ID: fe8cb324-d4a0-4f02-8551-a4161d09e95e


State after third_method: {'id': 'fe8cb324-d4a0-4f02-8551-a4161d09e95e', 'counter': 2, 'message': 'Hello from structured flow - updated - updated again'}


Plot saved as Three_methods.html


In [13]:
# prompt: Show image of html file with path -> /content/my_flow_plot.html

from IPython.display import HTML

HTML(filename='/content/Three_methods.html')


In [14]:
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel


class ExampleState(BaseModel):
    # Note: 'id' field is automatically added to all states
    counter: int = 0
    message: str = ""


class StructuredExampleFlow(Flow[ExampleState]):

    @start()
    def first_method(self):
        # Access the auto-generated ID if needed
        print(f"State ID: {self.state.id}")
        self.state.message = "Hello from structured flow"

    @listen(first_method)
    def second_method(self):
        self.state.counter += 1
        self.state.message += " - updated"

    @listen(second_method)
    def third_method(self):
        self.state.counter += 1
        self.state.message += " - updated again"

        print(f"State after third_method: {self.state}")


flow = StructuredExampleFlow()
flow.kickoff()
flow.plot("Next_Three_methods")

[1m[35m Flow started with ID: 06457e44-8cf8-441e-bf4f-14393764486f[00m


State ID: 06457e44-8cf8-441e-bf4f-14393764486f


State after third_method: id='06457e44-8cf8-441e-bf4f-14393764486f' counter=2 message='Hello from structured flow - updated - updated again'


Plot saved as Next_Three_methods.html


In [15]:
# prompt: Show image of html file with path -> /content/my_flow_plot.html

from IPython.display import HTML

HTML(filename='/content/Next_Three_methods.html')


In [16]:
from crewai.flow.flow import Flow, listen, or_, start

class OrExampleFlow(Flow):

    @start()
    def start_method(self):
        return "Hello from the start method"

    @listen(start_method)
    def second_method(self):
        return "Hello from the second method"

    @listen(or_(start_method, second_method))
    def logger(self, result):
        print(f"Logger: {result}")



flow = OrExampleFlow()
flow.kickoff()
flow.plot("or_condition")

[1m[35m Flow started with ID: 302a80dd-c039-432a-a7c4-c5c62eae1d68[00m


Logger: Hello from the start method


Logger: Hello from the second method


Plot saved as or_condition.html


In [17]:
# prompt: Show image of html file with path -> /content/my_flow_plot.html

from IPython.display import HTML

HTML(filename='/content/or_condition.html')


In [18]:
from crewai.flow.flow import Flow, and_, listen, start

class AndExampleFlow(Flow):

    @start()
    def start_method(self):
        self.state["greeting"] = "Hello from the start method"

    @listen(start_method)
    def second_method(self):
        self.state["joke"] = "What do computers eat? Microchips."

    @listen(and_(start_method, second_method))
    def logger(self):
        print("---- Logger ----")
        print(self.state)

flow = AndExampleFlow()
flow.kickoff()
flow.plot("and_condition")

[1m[35m Flow started with ID: 830e50a5-d630-41f5-8e9b-f391e16270d9[00m


---- Logger ----
{'id': '830e50a5-d630-41f5-8e9b-f391e16270d9', 'greeting': 'Hello from the start method', 'joke': 'What do computers eat? Microchips.'}


Plot saved as and_condition.html


In [19]:
# prompt: Show image of html file with path -> /content/my_flow_plot.html

from IPython.display import HTML

HTML(filename='/content/and_condition.html')

In [20]:
import random
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel

class ExampleState(BaseModel):
    success_flag: bool = False

class RouterFlow(Flow[ExampleState]):

    @start()
    def start_method(self):
        print("Starting the structured flow")
        random_boolean = random.choice([True, False])
        self.state.success_flag = random_boolean

    @router(start_method)
    def second_method(self):
        if self.state.success_flag:
            return "success"
        else:
            return "failed"

    @listen("success")
    def third_method(self):
        print("Third method running")

    @listen("failed")
    def fourth_method(self):
        print("Fourth method running")


flow = RouterFlow()
flow.kickoff()
flow.plot("router_flow")

[1m[35m Flow started with ID: ccd2348f-e2f3-47ca-b70e-25bc84e5bdfd[00m


Starting the structured flow


Fourth method running


Plot saved as router_flow.html


In [21]:
# prompt: Show image of html file with path -> /content/my_flow_plot.html

from IPython.display import HTML

HTML(filename='/content/router_flow.html')

In [22]:
!pip install crewai_tools



In [23]:
from google.colab import userdata

In [24]:
!pip install langchain-groq



In [28]:
print(ChatGroq( api_key=GROQ_API_KEY,
              model = "llama-3.3-70b-versatile"
              ))

client=<groq.resources.chat.completions.Completions object at 0x7f587d893bd0> async_client=<groq.resources.chat.completions.AsyncCompletions object at 0x7f587c497d10> model_name='llama-3.3-70b-versatile' model_kwargs={} groq_api_key=SecretStr('**********')


In [27]:
import asyncio
from typing import Any, Dict, List
from langchain_groq import ChatGroq
from crewai_tools import SerperDevTool
from pydantic import BaseModel, Field

from crewai.agent import Agent
from crewai.flow.flow import Flow, listen, start


# Define a structured output format
class MarketAnalysis(BaseModel):
    key_trends: List[str] = Field(description="List of identified market trends")
    market_size: str = Field(description="Estimated market size")
    competitors: List[str] = Field(description="Major competitors in the space")


# Define flow state
class MarketResearchState(BaseModel):
    product: str = ""
    analysis: MarketAnalysis | None = None


# Create a flow class
class MarketResearchFlow(Flow[MarketResearchState]):

    @start()
    def initialize_research(self) -> Dict[str, Any]:
        print(f"Starting market research for {self.state.product}")
        return {"product": self.state.product}

    @listen(initialize_research)
    async def analyze_market(self) -> Dict[str, Any]:
        # Create an Agent for market research
        GROQ_API_KEY=userdata.get('GROQ_API_KEY')
        analyst = Agent(
            llm = ChatGroq(
              api_key=GROQ_API_KEY,
              model = "llama-3.3-70b-versatile"
              ),
            role="Market Research Analyst",
            goal=f"Analyze the market for {self.state.product}",
            backstory="You are an experienced market analyst with expertise in "
            "identifying market trends and opportunities.",
            tools=[SerperDevTool()],
            verbose=True,
        )

        # Define the research query
        query = f"""
        Research the market for {self.state.product}. Include:
        1. Key market trends
        2. Market size
        3. Major competitors

        Format your response according to the specified structure.
        """

        # Execute the analysis with structured output format
        result = await analyst.kickoff_async(query, response_format=MarketAnalysis)
        if result.pydantic:
            print("result", result.pydantic)
        else:
            print("result", result)

        # Return the analysis to update the state
        return {"analysis": result.pydantic}

    @listen(analyze_market)
    def present_results(self, analysis) -> None:
        print("\nMarket Analysis Results")
        print("=====================")

        if isinstance(analysis, dict):
            # If we got a dict with 'analysis' key, extract the actual analysis object
            market_analysis = analysis.get("analysis")
        else:
            market_analysis = analysis

        if market_analysis and isinstance(market_analysis, MarketAnalysis):
            print("\nKey Market Trends:")
            for trend in market_analysis.key_trends:
                print(f"- {trend}")

            print(f"\nMarket Size: {market_analysis.market_size}")

            print("\nMajor Competitors:")
            for competitor in market_analysis.competitors:
                print(f"- {competitor}")
        else:
            print("No structured analysis data available.")
            print("Raw analysis:", analysis)


# Usage example
async def run_flow():
    flow = MarketResearchFlow()
    result = await flow.kickoff_async(inputs={"product": "AI-powered chatbots"})
    flow.plot("Market_Research_plot")
    return result


# Run the flow
if __name__ == "__main__":
    asyncio.run(run_flow())

[1m[35m Flow started with ID: 7335683b-d7c0-4f95-99a2-ad10d36b6b52[00m


Starting market research for AI-powered chatbots




LiteLLM.Info: If you need to debug this error, use `litellm._turn_on_debug()'.


[1;31mProvider List: https://docs.litellm.ai/docs/providers[0m



ERROR:root:LiteLLM call failed: litellm.BadRequestError: GroqException - {"error":{"message":"Invalid API Key","type":"invalid_request_error","code":"invalid_api_key"}}



[91m Error during LLM call: litellm.BadRequestError: GroqException - {"error":{"message":"Invalid API Key","type":"invalid_request_error","code":"invalid_api_key"}}
[00m


[91m Agent failed to reach a final answer. This is likely a bug - please report it.[00m
[91m An unknown error occurred. Please check the details below.[00m
[91m Error details: litellm.BadRequestError: GroqException - {"error":{"message":"Invalid API Key","type":"invalid_request_error","code":"invalid_api_key"}}
[00m


[Flow._execute_single_listener] Error in method analyze_market: litellm.BadRequestError: GroqException - {"error":{"message":"Invalid API Key","type":"invalid_request_error","code":"invalid_api_key"}}



Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/litellm/llms/custom_httpx/llm_http_handler.py", line 128, in _make_common_sync_call
    response = sync_httpx_client.post(
               ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/litellm/llms/custom_httpx/http_handler.py", line 576, in post
    raise e
  File "/usr/local/lib/python3.11/dist-packages/litellm/llms/custom_httpx/http_handler.py", line 558, in post
    response.raise_for_status()
  File "/usr/local/lib/python3.11/dist-packages/httpx/_models.py", line 829, in raise_for_status
    raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Client error '401 Unauthorized' for url 'https://api.groq.com/openai/v1/chat/completions'
For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/401

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/l

BadRequestError: litellm.BadRequestError: GroqException - {"error":{"message":"Invalid API Key","type":"invalid_request_error","code":"invalid_api_key"}}
