In [None]:
%pip install streamlit requests fastapi uvicorn crewai crewai[tools] openai termcolor yaspin

In [None]:
!wget https://raw.githubusercontent.com/NeetishPathak/dbops-agentic-crew/main/app/resources/pdfs/Redis.pdf -O Redis.pdf

In [22]:
from google.colab import userdata
os.environ['OPENAI_API_KEY'] = userdata.get('OPENAI_API_KEY')

In [24]:
# Crew AI Tools

from crewai_tools import FileWriterTool, PDFSearchTool
from crewai.tools import BaseTool
from pydantic import BaseModel

from pathlib import Path
import os
import platform
import subprocess
import shutil
import urllib.request

# File Write Tools
file_writer_tool = FileWriterTool()

# PDF Search Tool
pdf_file = "Redis.pdf"
pdf_rag_search_tool = PDFSearchTool(pdf_file)


# Infra Tools

class KindInstaller(BaseTool):
  def __init__(self, name="KindInstaller", description="Installs Kind...", **kwargs):
        super().__init__(name=name, description=description, **kwargs)

  def _run(self, input_type: dict) -> str:
    try:
      if not shutil.which("docker"):
        return "❌ Docker is not installed. Please install Docker first."

      if shutil.which("kind"):
        return "✅ 'kind' is already installed."

      system = platform.system().lower()

      if system not in ["linux", "darwin"]:
        return f"❌ Unsupported OS: {system}. Manual install required."

      # Download the latest Kind binary
      arch = subprocess.check_output(["uname", "-m"]).decode().strip()
      arch_map = {
        "linux": {"x86_64": "amd64", "aarch64": "arm64"},
        "darwin": {"x86_64": "amd64", "arm64": "arm64"}
      }
      arch_name = arch_map.get(system, {}).get(arch)
      if not arch_name:
        return f"❌ Unsupported architecture: {arch} on {system}."
      url = f"https://kind.sigs.k8s.io/dl/v0.29.0/kind-{system}-{arch_name}"

      tmp_kind_file =  os.path.join(os.getcwd(), "kind")
      urllib.request.urlretrieve(url, tmp_kind_file)

      # Make executable
      subprocess.run(["chmod", "+x", tmp_kind_file], check=True)
      # Move to a directory in PATH
      os.environ["PATH"] = f"{os.getcwd()}:{os.environ['PATH']}"

      return "✅ Kind has been installed successfully."

    except Exception as e:
      return f"❌ Failed to install Kind: {e}"

  def _arun(self, input: str = "") -> str:
      raise NotImplementedError("Async version not implemented.")


kind_cluster_tool = KindInstaller(description="Installs a Kind Tool for Kubernetes deployments.")

In [25]:
# CrewAI Agents

from crewai import Agent

## Agents
architect = Agent(
  role="{data_store} System Architect",
  goal="Design optimal {data_store} cluster configurations tailored to user workloads and environments.",
  backstory=("You are a principal system architect specializing in distributed data stores, particularly {data_store}. "
        "You are an expert in {data_store} internals and Kubernetes-ready deployments."
        "Use the tool to get the guidance on redis configuration. Do not use external knowledge beyond what is contained in the pdf file {pdf_file} "
        "Pass your question as a plain string to the 'query' parameter — for example: "
        "{ \"query\": \"optimal Redis config for 100 ops/sec workload\" }. "
        "Do not include 'description' or 'type' keys. Do not wrap the string inside another dictionary."
        "Just say, your capacity is limited to certain use cases only at the moment if the query is outside  the scope of the pdf."),
  verbose=False,
  tools=[pdf_rag_search_tool],
  allow_delegation=False,
)


manifest_writer_agent = Agent(
  role="{data_store} manifest Writer",
  goal="Vets and writes yaml configurations to the target directory.",
  backstory=("You are a dedicated writer agent specializing in {data_store} configurations. "
        "You leverage the File Writer Tool to create and update configuration files as needed."),
  verbose=False,
  tools=[file_writer_tool],
  allow_delegation=False,
)


infra_admin_agent = Agent(
  role="Infra Admin managing kind cluster",
  goal = "Manage the kind cluster setup and ensure the environment is ready for deployment.",
  backstory=("You are an experienced infra admin responsible for setting up and managing the kind cluster. "
        "You ensure that the environment is properly configured and ready for deployment of {data_store} configurations."),
  verbose=False,
  tools=[kind_cluster_tool],
  allow_delegation=False
)

In [26]:
from crewai import Task

## Tasks

# ----------------------------------------------------
# Define the architect's task to suggest a cluster design
# ----------------------------------------------------
design_cluster = Task(
    description=(
        "Design an optimal {data_store} architecture for this workload:\n"
        "{workload_description}\n\n"
        "Respond in this exact format:\n"
        "---\n"
        "Architecture Summary:\n<brief text>\n\n"
        "---\n"
        "Key Decisions:\n- <bullet1>\n- <bullet2>\n\n"
        "```yaml\n<valid Kubernetes YAML>\n```\n"
        "Do not add any extra commentary."
    ),
    expected_output="A summary, key decisions, and a valid Kubernetes YAML block.",
    agent=architect,
)

# ----------------------------------------------------
# Define the writer's task to create a manifest writer
# ----------------------------------------------------
write_manifest = Task(
    description=(
        "Validate the Kubernetes YAML and write it to `{target_directory}/{data_store}.yaml`.\n"
        "Use the File Writer Tool.\n"
        "If the file exists, overwrite it.\n"
        "**Important**: The YAML must be valid and ready for `kubectl apply`."
    ),
    expected_output="Confirmation that the YAML was successfully written.",
    agent=manifest_writer_agent,
    context=[design_cluster]
)

# ----------------------------------------------------
# Define the Infra's setup task to create a manifest writer
# ----------------------------------------------------
infra_setup = Task(
    description=(
        "Set up and validate a kind cluster for deploying {data_store}.\n"
        "Use the Kind Installer Tool.\n"
        "If already set up, confirm readiness.\n"
        "Provide access instructions if setup is successful."
    ),
    expected_output="Confirmation that the kind cluster is ready for deployment.",
    agent=infra_admin_agent,
)

In [32]:
from crewai import Crew, Process


## Crew

crew = Crew(
  agents=[architect, manifest_writer_agent, infra_admin_agent],
  tasks=[design_cluster, write_manifest, infra_setup],
  process=Process.sequential,
  verbose=True
)

In [None]:
import os
from pathlib import Path
import asyncio
from yaspin import yaspin
from termcolor import colored
import warnings

warnings.filterwarnings("ignore", category=UserWarning, module="pydantic")

inputs = {
  "data_store": "redis",
  "workload_description": (
    "A low-volume website. "
    "It handles 100 ops/sec with 80% reads, 20% writes. "
  ),
  "target_directory": "k8s_configs",
  "target_file": "redis.yaml",
  "pdf_file": "Redis.pdf"
}

async def run_crew():
    """
    Kick off the crew process with the provided inputs.
    """
    crew.kickoff(inputs=inputs)

def run_sync_crew():
    """
    Kick off the crew process with the provided inputs.
    """
    crew.kickoff(inputs=inputs)


print(colored("🚀 Crew execution started...\n", "yellow"))

run_sync_crew()

print(colored("\n🧠 All Tasks Finished  ", "green"))