|
| 1 | +--- |
| 2 | +type: docs |
| 3 | +title: "CrewAI Workflows" |
| 4 | +linkTitle: "CrewAI Workflows" |
| 5 | +weight: 25 |
| 6 | +description: "How to run CrewAI agents with durable, fault-tolerant execution using Dapr Workflows" |
| 7 | +--- |
| 8 | + |
| 9 | +## Overview |
| 10 | + |
| 11 | +Dapr Workflows make it possible to run CrewAI agents **reliably**, **durably**, and **with built-in resiliency**. |
| 12 | +By orchestrating CrewAI tasks with the Dapr Workflow engine, developers can: |
| 13 | + |
| 14 | +- Ensure long-running CrewAI work survives crashes and restarts |
| 15 | +- Get automatic checkpoints, retries, and state recovery |
| 16 | +- Run each CrewAI task as a durable activity |
| 17 | +- Observe execution through tracing, metrics, and structured logs |
| 18 | + |
| 19 | +This guide walks through orchestrating multiple CrewAI tasks using Dapr Workflows, ensuring each step is run *exactly once* even if the process restarts. |
| 20 | + |
| 21 | +## Getting Started |
| 22 | + |
| 23 | +Initialize Dapr locally to set up a self-hosted environment for development. This process installs the Dapr sidecar binaries, provisions the workflow engine, and prepares a default components directory. For full details, see the official [guide on initializing Dapr locally]({{% ref install-dapr-selfhost.md %}}). |
| 24 | + |
| 25 | +Initialize Dapr: |
| 26 | + |
| 27 | +```bash |
| 28 | +dapr init |
| 29 | +``` |
| 30 | + |
| 31 | +Verify that daprio/dapr, openzipkin/zipkin, and redis are running: |
| 32 | + |
| 33 | +```bash |
| 34 | +docker ps |
| 35 | +``` |
| 36 | + |
| 37 | +### Install Python |
| 38 | + |
| 39 | +{{% alert title="Note" color="info" %}} |
| 40 | +Make sure you have Python already installed. `Python >=3.10`. For installation instructions, visit the official [Python installation guide](https://www.python.org/downloads/). |
| 41 | +{{% /alert %}} |
| 42 | + |
| 43 | +### Install Dependencies |
| 44 | + |
| 45 | +```bash |
| 46 | +pip install dapr dapr-ext-workflow crewai |
| 47 | +``` |
| 48 | + |
| 49 | +### Create a Python Virtual Environment (recommended) |
| 50 | + |
| 51 | +```bash |
| 52 | +python -m venv .venv |
| 53 | +source .venv/bin/activate # Windows: .venv\Scripts\activate |
| 54 | +``` |
| 55 | + |
| 56 | +### Create a Workflow to Run CrewAI Tasks |
| 57 | + |
| 58 | +Create a file named crewai_workflow.py and paste the following: |
| 59 | + |
| 60 | +```python |
| 61 | +from dapr.ext.workflow import ( |
| 62 | + WorkflowRuntime, |
| 63 | + DaprWorkflowContext, |
| 64 | + WorkflowActivityContext, |
| 65 | + DaprWorkflowClient, |
| 66 | +) |
| 67 | +from crewai import Agent, Task, Crew |
| 68 | +import time |
| 69 | + |
| 70 | +wfr = WorkflowRuntime() |
| 71 | + |
| 72 | +# ------------------------------------------------------------ |
| 73 | +# 1. Define Agent, Tasks, and Task Dictionary |
| 74 | +# ------------------------------------------------------------ |
| 75 | +agent = Agent( |
| 76 | + role="Research Analyst", |
| 77 | + goal="Research and summarize impactful technology updates.", |
| 78 | + backstory="A skilled analyst who specializes in researching and summarizing technology topics.", |
| 79 | +) |
| 80 | + |
| 81 | +tasks = { |
| 82 | + "latest_ai_news": Task( |
| 83 | + description="Find the latest news about artificial intelligence.", |
| 84 | + expected_output="A 3-paragraph summary of the top 3 stories.", |
| 85 | + agent=agent, |
| 86 | + ), |
| 87 | + "ai_startup_launches": Task( |
| 88 | + description="Summarize the most impactful AI startup launches in the last 6 months.", |
| 89 | + expected_output="A list summarizing 2 AI startups with links.", |
| 90 | + agent=agent, |
| 91 | + ), |
| 92 | + "ai_policy_updates": Task( |
| 93 | + description="Summarize the newest AI government policy and regulation updates.", |
| 94 | + expected_output="A bullet-point list summarizing the latest policy changes.", |
| 95 | + agent=agent, |
| 96 | + ), |
| 97 | +} |
| 98 | + |
| 99 | +# ------------------------------------------------------------ |
| 100 | +# 2. Activity — runs ONE task by name |
| 101 | +# ------------------------------------------------------------ |
| 102 | +@wfr.activity(name="run_task") |
| 103 | +def run_task_activity(ctx: WorkflowActivityContext, task_name: str): |
| 104 | + print(f"Running CrewAI task: {task_name}", flush=True) |
| 105 | + |
| 106 | + task = tasks[task_name] |
| 107 | + |
| 108 | + # Create a Crew for just this one task |
| 109 | + temp_crew = Crew(agents=[agent], tasks=[task]) |
| 110 | + |
| 111 | + # kickoff() works across CrewAI versions |
| 112 | + result = temp_crew.kickoff() |
| 113 | + |
| 114 | + return str(result) |
| 115 | + |
| 116 | +# ------------------------------------------------------------ |
| 117 | +# 3. Workflow — orchestrates tasks durably |
| 118 | +# ------------------------------------------------------------ |
| 119 | +@wfr.workflow(name="crewai_multi_task_workflow") |
| 120 | +def crewai_workflow(ctx: DaprWorkflowContext): |
| 121 | + print("Starting multi-task CrewAI workflow", flush=True) |
| 122 | + |
| 123 | + latest_news = yield ctx.call_activity(run_task_activity, input="latest_ai_news") |
| 124 | + startup_summary = yield ctx.call_activity(run_task_activity, input="ai_startup_launches") |
| 125 | + policy_updates = yield ctx.call_activity(run_task_activity, input="ai_policy_updates") |
| 126 | + |
| 127 | + return { |
| 128 | + "latest_news": latest_news, |
| 129 | + "startup_summary": startup_summary, |
| 130 | + "policy_updates": policy_updates, |
| 131 | + } |
| 132 | + |
| 133 | +# ------------------------------------------------------------ |
| 134 | +# 4. Runtime + Client (entry point) |
| 135 | +# ------------------------------------------------------------ |
| 136 | +if __name__ == "__main__": |
| 137 | + wfr.start() |
| 138 | + |
| 139 | + client = DaprWorkflowClient() |
| 140 | + instance_id = "crewai-multi-01" |
| 141 | + |
| 142 | + client.schedule_new_workflow( |
| 143 | + workflow=crewai_workflow, |
| 144 | + input=None, |
| 145 | + instance_id=instance_id |
| 146 | + ) |
| 147 | + |
| 148 | + state = client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60) |
| 149 | + print(state.serialized_output) |
| 150 | +``` |
| 151 | + |
| 152 | +### Create the Workflow Database Component |
| 153 | + |
| 154 | +Dapr Workflows persist durable state using any [Dapr state store]({{% ref supported-state-stores %}}) that supports workflows. |
| 155 | +Create a components directory, then create the file workflowstore.yaml: |
| 156 | + |
| 157 | +```yaml |
| 158 | +apiVersion: dapr.io/v1alpha1 |
| 159 | +kind: Component |
| 160 | +metadata: |
| 161 | + name: statestore |
| 162 | +spec: |
| 163 | + type: state.redis |
| 164 | + version: v1 |
| 165 | + metadata: |
| 166 | + - name: redisHost |
| 167 | + value: localhost:6379 |
| 168 | + - name: redisPassword |
| 169 | + value: "" |
| 170 | + - name: actorStateStore |
| 171 | + value: "true" |
| 172 | +``` |
| 173 | +
|
| 174 | +This component stores: |
| 175 | +
|
| 176 | +* Checkpoints |
| 177 | +* Execution history |
| 178 | +* Deterministic resumption state |
| 179 | +* Final output data |
| 180 | +
|
| 181 | +### Set a CrewAI LLM Provider |
| 182 | +
|
| 183 | +CrewAI needs an LLM configuration or token to run. See instructions [here](https://docs.crewai.com/en/concepts/llms#setting-up-your-llm). |
| 184 | +
|
| 185 | +### Run the Workflow |
| 186 | +
|
| 187 | +Launch the CrewAI workflow using the Dapr CLI: |
| 188 | +
|
| 189 | +```bash |
| 190 | +dapr run \ |
| 191 | + --app-id crewaiwf \ |
| 192 | + --dapr-grpc-port 50001 \ |
| 193 | + --resources-path ./components \ |
| 194 | + -- python3 ./crewai_workflow.py |
| 195 | +``` |
| 196 | + |
| 197 | +As the workflow runs, each CrewAI task is executed as a durable activity. |
| 198 | +If the process crashes, the workflow resumes exactly where it left off. |
| 199 | + |
| 200 | +Open Zipkin to view workflow traces: |
| 201 | + |
| 202 | +``` |
| 203 | +http://localhost:9411 |
| 204 | +``` |
0 commit comments