In [10]:
from langchain.agents import Tool
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import SystemMessage, HumanMessage
import os
import json
import getpass
import tempfile
import subprocess
import re

# Set up Gemini API key
if not os.environ.get("GOOGLE_API_KEY"):
    try:
        from google.colab import userdata
        os.environ["GOOGLE_API_KEY"] = userdata.get('GEMINI_API_KEY')
    except:
        import getpass
        os.environ["GOOGLE_API_KEY"] = getpass.getpass("Enter Google API Key: ")

class PlannerAgent:
    def __init__(self):
        self.llm = ChatGoogleGenerativeAI(
            model="gemini-1.5-flash-latest",
            temperature=0.3,
            convert_system_message_to_human=True
        )
        self.system_prompt = """
        As an expert solution architect, decompose complex problems into executable sub-tasks.
        Use this JSON structure:
        {
            "subtasks": [
                {
                    "id": <unique integer>,
                    "desc": "<clear description>",
                    "dependencies": [<list of prerequisite task IDs>]
                }
            ]
        }
        """

    def decompose(self, task):
        try:
            messages = [
                SystemMessage(content=self.system_prompt),
                HumanMessage(content=f"TASK: {task}")
            ]
            llm_response = self.llm.invoke(messages)
            response_content = llm_response.content
            if '```json' in response_content:
                json_match = re.search(r'```json(.*?)```', response_content, re.DOTALL)
                if json_match:
                    response_content = json_match.group(1).strip()
            plan = json.loads(response_content)
            if "subtasks" not in plan or not isinstance(plan["subtasks"], list):
                raise ValueError("Invalid response format: Missing 'subtasks' list")
            for subtask in plan["subtasks"]:
                if "id" not in subtask or "desc" not in subtask:
                    raise ValueError("Subtask missing required fields: 'id' or 'desc'")
            return plan
        except Exception as e:
            print(f"Decomposition error: {str(e)}")
            return self._fallback_plan(task)

    def _fallback_plan(self, task):
        return {
            "subtasks": [
                {"id": 1, "desc": f"Analyze requirements: {task}", "dependencies": []},
                {"id": 2, "desc": f"Design solution architecture for {task}", "dependencies": [1]},
                {"id": 3, "desc": f"Implement core functionality for {task}", "dependencies": [2]},
                {"id": 4, "desc": f"Test and validate solution for {task}", "dependencies": [3]}
            ]
        }

class ExecutorAgent:
    def __init__(self):
        self.llm = ChatGoogleGenerativeAI(
            model="gemini-1.5-flash-latest",
            temperature=0.2,
            convert_system_message_to_human=True
        )

    def write_code(self, requirements: str) -> dict:
        prompt = (
            f"You are an expert Python developer. "
            f"Write complete, self-contained Python code to {requirements}. "
            f"Return only the code block, wrapped in triple backticks for Python."
        )
        llm_response = self.llm.invoke(prompt).content
        match = re.search(r"```(?:python)?\n([\s\S]*?)```", llm_response)
        code = match.group(1) if match else llm_response
        tmp_dir = tempfile.mkdtemp()
        file_path = os.path.join(tmp_dir, "main.py")
        with open(file_path, "w") as f:
            f.write(code)
        return {"code": code, "files": [file_path]}

    def run_tests(self, code_info: dict) -> dict:
        files = code_info.get("files", [])
        if not files:
            return {"passed": False, "errors": ["No file to execute."]}
        file_to_run = files[0]
        try:
            result = subprocess.run(
                ["python", file_to_run],
                capture_output=True, text=True, timeout=10
            )
            if result.returncode == 0:
                return {"passed": True, "errors": []}
            else:
                return {"passed": False, "errors": [result.stderr.strip()]}
        except Exception as e:
            return {"passed": False, "errors": [str(e)]}

class ReviewerAgent:
    def __init__(self):
        self.llm = ChatGoogleGenerativeAI(
            model="gemini-1.5-flash-latest",
            temperature=0.2,
            convert_system_message_to_human=True
        )

    def analyze(self, code_result: dict, test_result: dict) -> dict:
        code = code_result.get("code", "")
        tests = test_result.get("errors", []) or []
        tests_output = "\n".join(tests) if tests else "All tests passed."
        prompt = (
            "You are a senior software engineer and code reviewer. "
            "Review the following Python code snippet and its test results. "
            "Provide a JSON object with two keys:\n"
            " - feedback: A concise summary of overall code quality.\n"
            " - critical_issues: A list of strings describing any bugs, logical errors, "
            "or style issues that should be fixed.\n\n"
            f"Code:\n```python\n{code}\n```\n\n"
            f"Test Results:\n{tests_output}\n\n"
            "Output JSON only."
        )
        llm_response = self.llm.invoke(prompt).content
        try:
            json_str = re.search(r"\{[\s\S]*\}", llm_response).group(0)
            result = json.loads(json_str)
            feedback = result.get("feedback", "")
            critical_issues = result.get("critical_issues", [])
        except Exception:
            feedback = llm_response.strip()
            critical_issues = []
        return {"feedback": feedback, "critical_issues": critical_issues}

class Orchestrator:
    def __init__(self):
        self.planner = PlannerAgent()
        self.executor = ExecutorAgent()
        self.reviewer = ReviewerAgent()
        self._generated = []
        self.integrator_llm = ChatGoogleGenerativeAI(
            model="gemini-1.5-flash-latest",
            temperature=0.1,
            convert_system_message_to_human=True
        )

    def integrate_code(self, code_snippets: list) -> str:
        if not code_snippets:
            return "# No code snippets to integrate."
        combined_snippets = "\n\n# --- SNIPPET ---\n\n".join(code_snippets)
        integration_prompt = (
            "You are an expert software integrator. You have been given several Python code snippets. "
            "Your task is to combine them into a single, cohesive, and functional Python script.\n\n"
            "Perform the following actions:\n"
            "1. Merge all imports at the top and remove duplicates.\n"
            "2. Logically combine classes and functions. If functions with the same name exist, merge them intelligently.\n"
            "3. Remove redundant code, placeholder comments, and example usage blocks.\n"
            "4. Ensure the final script is clean, correct, and executable.\n"
            "5. Return ONLY the final, integrated Python code, wrapped in a single triple-backtick block.\n\n"
            f"Here are the code snippets to integrate:\n\n{combined_snippets}"
        )
        messages = [
            SystemMessage(content="You are an expert Python code integrator."),
            HumanMessage(content=integration_prompt)
        ]
        llm_response = self.integrator_llm.invoke(messages).content
        match = re.search(r"```(?:python)?\n([\s\S]*?)```", llm_response)
        integrated_code = match.group(1).strip() if match else llm_response.strip()
        return integrated_code

    def execute_project(self, user_request):
        plan = self.planner.decompose(user_request)
        if not plan or "subtasks" not in plan or not isinstance(plan["subtasks"], list):
            print("Planner returned an invalid plan.")
            return "Project execution failed: Invalid plan."
        self._generated = []
        for subtask in plan["subtasks"]:
            if not isinstance(subtask, dict) or "desc" not in subtask:
                print(f"Skipping invalid subtask: {subtask}")
                continue
            print(f"\n🔧 Processing subtask {subtask['id']}: {subtask['desc']}")
            code_result = self.executor.write_code(subtask["desc"])
            if not code_result or "code" not in code_result:
                print(f"Executor failed to generate code for subtask: {subtask['desc']}")
                continue
            self._generated.append(code_result["code"])
            print("🧪 Running tests...")
            test_result = self.executor.run_tests(code_result)
            if not test_result or "passed" not in test_result:
                print(f"Executor failed to run tests for subtask: {subtask['desc']}")
                continue
            if not test_result["passed"]:
                print("🔍 Reviewing failed tests...")
                review = self.reviewer.analyze(code_result, test_result)
                print(f"📝 Review feedback: {review.get('feedback', 'No feedback')}")
                if review.get("critical_issues"):
                    print(f"❌ Critical issues: {review['critical_issues']}")
            else:
                print("✅ Tests passed")
        print("\n\n⚙️ Integrating all generated code snippets into a final script...")
        if not self._generated:
            return "No code generated for any subtasks"
        final_code = self.integrate_code(self._generated)
        print("✅ Integration complete.")
        return final_code

# Example Usage:
if __name__ == "__main__":
    system = Orchestrator()
    request = "Create a Flask app with a single endpoint that returns 'Hello, World!'."
    final_script = system.execute_project(request)
    print("\n--- FINAL INTEGRATED SCRIPT ---")
    print(final_script)





🔧 Processing subtask 1: Create a new Python virtual environment.




🧪 Running tests...
✅ Tests passed

🔧 Processing subtask 2: Install Flask within the virtual environment: `pip install Flask`




🧪 Running tests...
🔍 Reviewing failed tests...




📝 Review feedback: The code is functional but has some critical issues related to error handling, cross-platform compatibility, and security.
❌ Critical issues: ['**Improper error handling:** While the code attempts to catch exceptions, it relies too heavily on generic `except Exception` blocks. This masks potential underlying issues and makes debugging difficult.  More specific exception handling is needed to address different failure scenarios (e.g., permission errors, network issues during pip install).', "**Cross-platform activation:** The activation script path determination is brittle. It assumes a standard venv layout and doesn't handle variations in virtual environment creation or non-standard locations.  Using `venv.EnvBuilder`'s `activate()` method would be more robust.", "**Security risk:** Using `subprocess.run` with shell=False (implied by passing a list) is good, but the code constructs the activation command string by concatenation. This is vulnerable to shell injection 



🧪 Running tests...
🔍 Reviewing failed tests...




📝 Review feedback: The code is functional but has several areas for improvement in terms of scalability, data management, and error handling.
❌ Critical issues: ['The `products` list is used as an in-memory database. This is not suitable for production; a persistent database (e.g., SQLite, PostgreSQL) should be used.', 'The `update_product` function performs an in-place update of the `product` dictionary. This can lead to unexpected behavior and data corruption.  Create a copy of the product before updating.', 'The `delete_product` function uses a global variable. This is bad practice and makes the code harder to test and maintain.  A better approach would be to pass the `products` list as a parameter or use a class to encapsulate the data and methods.', 'Error handling is basic. More robust error handling and logging should be implemented.', 'The code lacks input validation beyond checking for the presence of keys.  More comprehensive validation (e.g., data types, ranges) is needed.',



🧪 Running tests...
🔍 Reviewing failed tests...




📝 Review feedback: The code is well-structured and handles errors appropriately.  The use of Flask's error handling mechanisms is good practice. However, the test failure indicates a deployment issue, not a code problem.
❌ Critical issues: ["The test failure 'Address already in use' is not a code defect.  Port 5000 is already in use by another process.  The developer should identify and stop the conflicting process or specify a different port using the `port` argument in `app.run()` (e.g., `app.run(debug=True, port=5001)`)."]

🔧 Processing subtask 5: Run the Flask development server: `flask run`




🧪 Running tests...
🔍 Reviewing failed tests...




📝 Review feedback: The code is well-structured and handles errors effectively, but it lacks a crucial feature for practical use.
❌ Critical issues: ["The function lacks the ability to specify the Flask application.  The error message from the test results shows that it's trying to run Flask without knowing which application to run.  It should accept an `app_module` or `app_path` parameter to specify the application.", 'The error handling is too generic. While it catches many exceptions, the final `except Exception as e` clause is too broad.  It should be narrowed down to catch specific, expected exceptions related to subprocess execution or Flask.', "The code assumes 'flask' is available on the system's PATH. While it checks for Flask's installation, it doesn't handle cases where the Flask executable isn't in the PATH.  Consider adding a check to ensure the 'flask' command is executable before attempting to run it."]

🔧 Processing subtask 6: Test the endpoint by accessing the applicati



🧪 Running tests...
✅ Tests passed

🔧 Processing subtask 7: Optional:  Create a `requirements.txt` file listing Flask as a dependency for easier reproducibility.




🧪 Running tests...
🔍 Reviewing failed tests...




📝 Review feedback: The code is well-structured and handles errors gracefully, but has a critical flaw and some minor style issues.
❌ Critical issues: ['The application uses an in-memory data structure (`data`). This is unsuitable for production; a persistent database is required.  The current implementation will lose all data on restart.', "The `max(item['id'] for item in data['items']) + 1` approach for generating new IDs is prone to race conditions in a multi-threaded or multi-process environment. A more robust ID generation strategy (e.g., using a database sequence or UUIDs) is needed.", 'Error handling in the `except Exception as e` blocks is too broad.  It reveals sensitive internal information (`str(e)`) to the client, which is a security risk.  Log the error for debugging purposes, but return a generic error message to the client.', "The code lacks input sanitization.  While it checks data types, it doesn't protect against things like SQL injection (though not relevant here sinc



🧪 Running tests...
🔍 Reviewing failed tests...




📝 Review feedback: The code is mostly well-structured and functional, but has a critical security vulnerability and a minor usability issue.
❌ Critical issues: ['The application uses `app.run(debug=True)`.  Debug mode should **never** be enabled in a production environment.  It exposes the application to significant security risks.', 'The `app.secret_key` is generated using `os.urandom(24)`, which is insufficient for production.  A cryptographically secure random number generator should be used, and the key should be stored securely (e.g., environment variable).', 'Error handling for file uploads could be improved.  The current implementation only provides a text message for invalid file types; more informative feedback to the user would enhance usability.', 'The application lacks input validation beyond checking file extensions.  This leaves it vulnerable to various attacks (e.g., directory traversal). More robust validation is needed.']


⚙️ Integrating all generated code snippets in



✅ Integration complete.

--- FINAL INTEGRATED SCRIPT ---
import json
import os
import subprocess
import sys
from flask import Flask, jsonify, request, render_template, redirect, url_for
from urllib.parse import urljoin
import uuid
from werkzeug.utils import secure_filename
import venv
import requests
import pytest


def create_virtualenv(venv_path, python_executable=None):
    if not os.path.isdir(venv_path):
        raise ValueError(f"Invalid venv_path: '{venv_path}' is not a directory.")

    if python_executable:
        if not os.path.exists(python_executable):
            raise FileNotFoundError(f"Python executable not found: {python_executable}")
        python_exe = python_executable
    else:
        python_exe = sys.executable

    try:
        subprocess.run([python_exe, "-m", "venv", venv_path], check=True, capture_output=True, text=True)
        print(f"Virtual environment created successfully at: {venv_path}")
        return venv_path
    except subprocess.CalledProcessErr

In [2]:
!pip install langchain-google-genai flask langchain langchain-core

Collecting langchain-google-genai
  Downloading langchain_google_genai-2.1.8-py3-none-any.whl.metadata (7.0 kB)
Collecting filetype<2.0.0,>=1.2.0 (from langchain-google-genai)
  Downloading filetype-1.2.0-py2.py3-none-any.whl.metadata (6.5 kB)
Collecting google-ai-generativelanguage<0.7.0,>=0.6.18 (from langchain-google-genai)
  Downloading google_ai_generativelanguage-0.6.18-py3-none-any.whl.metadata (9.8 kB)
Downloading langchain_google_genai-2.1.8-py3-none-any.whl (47 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m47.8/47.8 kB[0m [31m929.6 kB/s[0m eta [36m0:00:00[0m
[?25hDownloading filetype-1.2.0-py2.py3-none-any.whl (19 kB)
Downloading google_ai_generativelanguage-0.6.18-py3-none-any.whl (1.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m9.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: filetype, google-ai-generativelanguage, langchain-google-genai
  Attempting uninstall: google-ai-generativelang