In [None]:
!pip install google-generativeai pandas numpy rich



In [None]:
from google.colab import userdata, files
import google.generativeai as genai
import pandas as pd
import numpy as np
import json
import time
from datetime import datetime, timezone
from typing import Dict, List, Any

In [None]:
GEMINI_API_KEY = userdata.get("GEMINI_API_KEY")

if GEMINI_API_KEY is None:
    raise ValueError("‚ùå ERROR: Add GEMINI_API_KEY in Colab ‚Üí Secrets")

genai.configure(api_key=GEMINI_API_KEY)

# IMPORTANT: Only use this model throughout
MODEL_NAME = "gemini-2.5-flash-lite"

print("‚úÖ Gemini API Connected Successfully")
print(f"üìå Using Model: {MODEL_NAME}")


‚úÖ Gemini API Connected Successfully
üìå Using Model: gemini-2.5-flash-lite


In [None]:
def utc_now():
    """Get current UTC timestamp"""
    return datetime.now(timezone.utc).isoformat()

def safe_api_call(model, prompt, max_retries=3):
    """Make API call with retry logic"""
    for attempt in range(max_retries):
        try:
            response = model.generate_content(prompt)
            return response.text
        except Exception as e:
            if "429" in str(e) or "quota" in str(e).lower():
                wait_time = (attempt + 1) * 2
                print(f"‚è≥ Rate limit hit. Waiting {wait_time}s...")
                time.sleep(wait_time)
            else:
                raise e
    raise Exception("Max retries exceeded")

def clean_json_response(text):
    """Extract and clean JSON from AI response"""
    text = text.strip()
    # Remove markdown code blocks
    text = text.replace('```json', '').replace('```', '')
    # Find JSON object boundaries
    start = text.find('{')
    end = text.rfind('}') + 1
    if start != -1 and end > start:
        text = text[start:end]
    return json.loads(text)

In [None]:
print("üìÇ Upload your CSV file:")
uploaded = files.upload()
csv_name = list(uploaded.keys())[0]
df = pd.read_csv(csv_name)


print(f"\n‚úÖ Loaded: {csv_name}")
print(f"üìä Shape: {df.shape}")
print(f"üìã Columns: {list(df.columns)}")
print("\nüîç First 3 rows:")
print(df.head(3))

üìÇ Upload your CSV file:


Saving shipping_data_0.csv to shipping_data_0.csv

‚úÖ Loaded: shipping_data_0.csv
üìä Shape: (110, 6)
üìã Columns: ['origin_warehouse', 'destination_store', 'product', 'on_time', 'product_quantity', 'driver_identifier']

üîç First 3 rows:
                       origin_warehouse                     destination_store  \
0  d5566b15-b071-4acf-8e8e-c98433083b2d  50d33715-4c77-4dd9-8b9d-ff1ca372a2a2   
1  c42f0de8-b4f0-4167-abd1-ae79e5e18eea  172eb8f3-1033-4fb6-b66b-d0df09df3161   
2  b145f396-de9b-42f1-9cc9-f5b52c3a941c  65e4544d-42ae-4751-9580-bdcb90e5fcda   

   product  on_time  product_quantity                     driver_identifier  
0   lotion     True                59  d8da0460-cf39-4f38-9fff-6c9b4e344d8a  
1  windows     True                28  293ccaec-6592-4f04-aae5-3e238fe62614  
2     skis     True                63  80988f09-91a3-4e1b-8e69-13551c53f318  


In [None]:
# Cell 6: Dataset Intelligence Agent
class DatasetIntelligenceAgent:
    """Understands what kind of data we're analyzing"""

    def __init__(self):
        self.model = genai.GenerativeModel(MODEL_NAME)
        self.analysis = None

    def analyze_dataset(self, df: pd.DataFrame) -> Dict:
        """Deep analysis of dataset structure and business context"""

        # Prepare dataset summary
        summary = {
            "columns": list(df.columns),
            "dtypes": df.dtypes.astype(str).to_dict(),
            "row_count": len(df),
            "sample_data": df.head(5).to_dict('records'),
            "null_counts": df.isnull().sum().to_dict(),
            "unique_counts": {col: df[col].nunique() for col in df.columns}
        }

        prompt = f"""You are analyzing a business dataset. Determine what domain this belongs to and how to analyze it.

DATASET INFORMATION:
- Columns: {summary['columns']}
- Data types: {summary['dtypes']}
- Row count: {summary['row_count']}
- Unique value counts: {summary['unique_counts']}
- Sample rows: {json.dumps(summary['sample_data'][:3], indent=2, default=str)}

TASK: Analyze this dataset and provide a structured understanding.

Return ONLY a JSON object with this structure:
{{
  "domain": "string (e.g., 'supply_chain', 'sales', 'inventory', 'logistics', 'retail')",
  "dataset_type": "string (e.g., 'shipment_tracking', 'order_fulfillment', 'warehouse_inventory')",
  "business_context": "string (brief description of what this data represents)",
  "key_entities": ["list", "of", "main", "entities"],
  "problem_indicators": {{
    "column_name": "what problem it indicates"
  }},
  "success_metrics": {{
    "column_name": "what success it measures"
  }},
  "potential_issues": ["list", "of", "issues", "to", "monitor"],
  "autonomous_actions": ["list", "of", "actions", "an", "agent", "could", "take"],
  "analysis_strategy": "how to approach analyzing this data"
}}

Be specific based on actual column names and data patterns."""

        response_text = safe_api_call(self.model, prompt)
        self.analysis = clean_json_response(response_text)

        print("\nü§ñ AI Dataset Analysis:")
        print(json.dumps(self.analysis, indent=2))

        return self.analysis

# Initialize and run dataset analysis
intelligence_agent = DatasetIntelligenceAgent()
dataset_context = intelligence_agent.analyze_dataset(df)



ü§ñ AI Dataset Analysis:
{
  "domain": "logistics",
  "dataset_type": "shipment_delivery_performance",
  "business_context": "This dataset tracks shipments from origin warehouses to destination stores, including information about the product, quantity, delivery timeliness, and the driver involved. It appears to be focused on the efficiency and performance of delivery operations.",
  "key_entities": [
    "warehouse",
    "store",
    "product",
    "driver",
    "shipment"
  ],
  "problem_indicators": {
    "on_time": "Indicates missed delivery deadlines. A high number of 'false' values suggests delivery delays, impacting customer satisfaction and operational efficiency.",
    "origin_warehouse": "High cardinality (110 unique values for 110 rows) suggests each row might represent a unique shipment or a very granular tracking of individual shipments. If warehouses are truly unique per row, it might indicate issues with tracking aggregation or data entry, or that the dataset is focused

In [None]:
# Cell 7: Problem Detection Agent
class ProblemDetectionAgent:
    """Scans data for issues requiring intervention"""

    def __init__(self, dataset_context: Dict):
        self.model = genai.GenerativeModel(MODEL_NAME)
        self.context = dataset_context
        self.problems_found = []

    def scan_dataset(self, df: pd.DataFrame) -> List[Dict]:
        """Analyze entire dataset for patterns and problems"""

        # Create aggregated view for pattern detection
        analysis_data = {
            "total_rows": len(df),
            "problem_indicators": self.context.get("problem_indicators", {}),
            "column_stats": {}
        }

        # Calculate relevant statistics
        for col in df.columns:
            if df[col].dtype in ['int64', 'float64']:
                analysis_data["column_stats"][col] = {
                    "mean": float(df[col].mean()),
                    "std": float(df[col].std()),
                    "min": float(df[col].min()),
                    "max": float(df[col].max())
                }
            elif df[col].dtype == 'bool':
                analysis_data["column_stats"][col] = {
                    "true_count": int(df[col].sum()),
                    "false_count": int((~df[col]).sum()),
                    "true_percentage": float(df[col].mean() * 100)
                }
            else:
                analysis_data["column_stats"][col] = {
                    "unique_count": int(df[col].nunique()),
                    "top_values": df[col].value_counts().head(5).to_dict()
                }

        prompt = f"""You are analyzing a {self.context['dataset_type']} dataset to find problems.

DATASET CONTEXT:
{json.dumps(self.context, indent=2)}

STATISTICAL ANALYSIS:
{json.dumps(analysis_data, indent=2, default=str)}

SAMPLE RECORDS:
{json.dumps(df.head(10).to_dict('records'), indent=2, default=str)}

TASK: Identify specific, actionable problems that require autonomous intervention.

Return ONLY a JSON object:
{{
  "problems": [
    {{
      "problem_id": "PROB-001",
      "severity": "CRITICAL|HIGH|MEDIUM|LOW",
      "category": "category name",
      "description": "specific problem description",
      "affected_records": "how many/which records",
      "business_impact": "what's the business impact",
      "requires_action": true/false,
      "suggested_actions": ["action1", "action2"]
    }}
  ],
  "summary": {{
    "total_problems": 0,
    "critical_count": 0,
    "high_count": 0,
    "requires_immediate_action": true/false
  }}
}}

Focus on REAL problems in the data, not hypothetical ones."""

        response_text = safe_api_call(self.model, prompt)
        result = clean_json_response(response_text)

        self.problems_found = result.get("problems", [])

        print(f"\n‚ö†Ô∏è  PROBLEM SCAN COMPLETE")
        print(f"   Total Problems Found: {result['summary']['total_problems']}")
        print(f"   Critical: {result['summary']['critical_count']}")
        print(f"   High: {result['summary']['high_count']}")

        return result

# Run problem detection
problem_detector = ProblemDetectionAgent(dataset_context)
problem_analysis = problem_detector.scan_dataset(df)



‚ö†Ô∏è  PROBLEM SCAN COMPLETE
   Total Problems Found: 3
   Critical: 0
   High: 2


In [None]:
# Cell 8: Autonomous Planning Agent
class AutonomousPlannerAgent:
    """Creates detailed action plans for each problem"""

    def __init__(self, dataset_context: Dict):
        self.model = genai.GenerativeModel(MODEL_NAME)
        self.context = dataset_context
        self.plans = []

    def create_plan(self, problem: Dict, df: pd.DataFrame) -> Dict:
        """Generate detailed autonomous action plan"""

        prompt = f"""You are an autonomous planning agent for {self.context['dataset_type']}.

PROBLEM TO SOLVE:
{json.dumps(problem, indent=2)}

BUSINESS CONTEXT:
- Domain: {self.context['domain']}
- Key entities: {self.context['key_entities']}
- Available actions: {self.context['autonomous_actions']}

TASK: Create a detailed, step-by-step execution plan.

Return ONLY a JSON object:
{{
  "plan_id": "PLAN-{problem['problem_id']}",
  "problem_reference": "{problem['problem_id']}",
  "objective": "clear statement of what we're trying to achieve",
  "execution_steps": [
    {{
      "step_number": 1,
      "action_type": "API_CALL|DATABASE_QUERY|NOTIFICATION|REROUTE|REORDER",
      "description": "what to do",
      "tool_needed": "specific tool/API",
      "parameters": {{}},
      "expected_outcome": "what success looks like",
      "rollback_plan": "what to do if this fails"
    }}
  ],
  "success_criteria": ["criterion1", "criterion2"],
  "estimated_impact": "quantifiable benefit",
  "estimated_time": "how long to execute",
  "dependencies": ["what needs to happen first"]
}}

Make the plan realistic and executable with specific details."""

        response_text = safe_api_call(self.model, prompt)
        plan = clean_json_response(response_text)

        plan["created_at"] = utc_now()
        self.plans.append(plan)

        return plan

# Create plans for each problem
planner_agent = AutonomousPlannerAgent(dataset_context)
execution_plans = []

print("\nüìã CREATING AUTONOMOUS PLANS...")
for i, problem in enumerate(problem_detector.problems_found):
    if problem.get("requires_action"):
        print(f"\n[{i+1}] Planning for: {problem['problem_id']}")
        plan = planner_agent.create_plan(problem, df)
        execution_plans.append(plan)
        print(f"   ‚úì Plan created: {len(plan['execution_steps'])} steps")
        time.sleep(1)  # Rate limiting



üìã CREATING AUTONOMOUS PLANS...

[1] Planning for: PROB-001
   ‚úì Plan created: 6 steps

[2] Planning for: PROB-002
   ‚úì Plan created: 5 steps


In [None]:
# Cell 9: Autonomous Execution Agent
class ExecutionAgent:
    """Simulates execution of autonomous plans"""

    def __init__(self, dataset_context: Dict):
        self.model = genai.GenerativeModel(MODEL_NAME)
        self.context = dataset_context
        self.execution_logs = []

    def execute_plan(self, plan: Dict) -> Dict:
        """Execute each step of the plan"""

        print(f"\nüîß EXECUTING: {plan['plan_id']}")
        print(f"   Objective: {plan['objective']}")

        execution_log = {
            "plan_id": plan['plan_id'],
            "started_at": utc_now(),
            "steps_executed": [],
            "status": "IN_PROGRESS"
        }

        for step in plan['execution_steps']:
            print(f"\n   Step {step['step_number']}: {step['description']}")

            # Simulate execution with AI
            prompt = f"""Simulate executing this autonomous action:

STEP: {json.dumps(step, indent=2)}

CONTEXT: {self.context['dataset_type']} system

Simulate realistic execution and return result in JSON:
{{
  "step_number": {step['step_number']},
  "status": "SUCCESS|FAILED|PARTIAL",
  "action_taken": "specific action performed",
  "result_data": {{}},
  "metrics": {{}},
  "notes": "important details",
  "timestamp": "{utc_now()}"
}}

Be realistic about what would actually happen."""

            response_text = safe_api_call(self.model, prompt)
            result = clean_json_response(response_text)

            execution_log["steps_executed"].append(result)

            status_icon = "‚úì" if result["status"] == "SUCCESS" else "‚úó"
            print(f"      {status_icon} {result['status']}: {result['action_taken']}")

            time.sleep(1)  # Rate limiting

        execution_log["completed_at"] = utc_now()
        execution_log["status"] = "COMPLETED" if all(
            s["status"] == "SUCCESS" for s in execution_log["steps_executed"]
        ) else "PARTIAL"

        self.execution_logs.append(execution_log)

        print(f"\n   ‚úÖ Plan {execution_log['status']}")

        return execution_log

# Execute all plans
executor_agent = ExecutionAgent(dataset_context)
execution_results = []

print("\n" + "="*60)
print("üöÄ AUTONOMOUS EXECUTION STARTED")
print("="*60)

for plan in execution_plans:
    result = executor_agent.execute_plan(plan)
    execution_results.append(result)
    time.sleep(2)  # Rate limiting between plans



üöÄ AUTONOMOUS EXECUTION STARTED

üîß EXECUTING: PLAN-PROB-001
   Objective: Resolve the data integrity issue where 'origin_warehouse', 'destination_store', and 'driver_identifier' columns have unique values per shipment, preventing meaningful performance analysis.

   Step 1: Initiate an investigation into the data generation process for shipment data to understand why 'origin_warehouse', 'destination_store', and 'driver_identifier' are unique per row.
      ‚úì SUCCESS: Called data_investigation_api with parameters: problem_id='PROB-001', investigation_focus=['data_source_origin', 'etl_processes', 'data_entry_procedures'], columns_of_concern=['origin_warehouse', 'destination_store', 'driver_identifier'].

   Step 2: Alert the data engineering and logistics operations teams about the data anomaly and the ongoing investigation.
      ‚úì SUCCESS: Sent high-severity alert via alerting_system to data_engineering_lead and logistics_operations_manager.

   Step 3: Based on the findings 

In [None]:
# Cell 10: Generate Final Report
final_report = {
    "analysis_metadata": {
        "dataset_name": csv_name,
        "analyzed_at": utc_now(),
        "total_rows": len(df),
        "model_used": MODEL_NAME
    },
    "dataset_intelligence": dataset_context,
    "problem_detection": problem_analysis,
    "autonomous_plans": execution_plans,
    "execution_results": execution_results,
    "summary": {
        "problems_detected": len(problem_detector.problems_found),
        "plans_created": len(execution_plans),
        "plans_executed": len(execution_results),
        "successful_executions": sum(1 for r in execution_results if r["status"] == "COMPLETED"),
        "total_steps_executed": sum(len(r["steps_executed"]) for r in execution_results)
    }
}

# Save to JSON
output_filename = f"autonomous_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"

with open(output_filename, 'w') as f:
    json.dump(final_report, f, indent=2, default=str)

print("\n" + "="*60)
print("üìä FINAL REPORT")
print("="*60)
print(f"‚úÖ Analysis Complete!")
print(f"üìÅ Output File: {output_filename}")
print(f"\nüìà Summary:")
print(f"   ‚Ä¢ Problems Detected: {final_report['summary']['problems_detected']}")
print(f"   ‚Ä¢ Autonomous Plans: {final_report['summary']['plans_created']}")
print(f"   ‚Ä¢ Successfully Executed: {final_report['summary']['successful_executions']}")
print(f"   ‚Ä¢ Total Steps: {final_report['summary']['total_steps_executed']}")
print("="*60)

# Download the file
files.download(output_filename)

print("\n‚ú® All done! Check your downloads folder for the JSON report.")



üìä FINAL REPORT
‚úÖ Analysis Complete!
üìÅ Output File: autonomous_analysis_20251129_151841.json

üìà Summary:
   ‚Ä¢ Problems Detected: 3
   ‚Ä¢ Autonomous Plans: 2
   ‚Ä¢ Successfully Executed: 2
   ‚Ä¢ Total Steps: 11


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>


‚ú® All done! Check your downloads folder for the JSON report.


In [None]:
class DataPreprocessingAgent:
    """Responsible for identifying and handling missing values, correcting data types, and detecting outliers."""

    def __init__(self, df: pd.DataFrame):
        self.df = df.copy()
        self.missing_values_report = {}
        self.datatype_corrections = {}
        self.outlier_report = {}

    def clean_missing_values(self) -> pd.DataFrame:
        """Identifies and handles missing values. Fills numerical NaNs with median, categorical with mode."""
        print("\nüîç Cleaning missing values...")
        for col in self.df.columns:
            if self.df[col].isnull().any():
                missing_count = self.df[col].isnull().sum()
                missing_percentage = (missing_count / len(self.df)) * 100
                self.missing_values_report[col] = {
                    "count": missing_count,
                    "percentage": f"{missing_percentage:.2f}%"
                }

                if self.df[col].dtype in ['int64', 'float64']:
                    median_val = self.df[col].median()
                    self.df[col].fillna(median_val, inplace=True)
                    print(f"   - Column '{col}': Filled {missing_count} missing numerical values with median ({median_val}).")
                elif self.df[col].dtype == 'object' or self.df[col].dtype == 'bool': # Also cover boolean in case of NaN
                    mode_val = self.df[col].mode()[0] if not self.df[col].mode().empty else None
                    if mode_val is not None:
                        self.df[col].fillna(mode_val, inplace=True)
                        print(f"   - Column '{col}': Filled {missing_count} missing categorical/boolean values with mode ({mode_val}).")
                    else:
                        print(f"   - Column '{col}': Could not determine mode for missing values, leaving as is.")

        if not self.missing_values_report:
            print("   No missing values found.")
        print("   Missing value cleaning complete.")
        return self.df

    def correct_datatypes(self) -> pd.DataFrame:
        """Infers and corrects data types for columns."""
        print("\n‚öôÔ∏è Correcting data types...")
        initial_dtypes = self.df.dtypes.to_dict()
        for col in self.df.columns:
            # Try to convert object columns to numeric if possible
            if self.df[col].dtype == 'object':
                try:
                    # Attempt to convert to numeric, coercing errors to NaN
                    converted = pd.to_numeric(self.df[col], errors='coerce')
                    if not converted.isnull().all(): # If not all values became NaN, it's numeric
                        self.df[col] = converted
                        if initial_dtypes[col] != self.df[col].dtype:
                            self.datatype_corrections[col] = f"Changed from {initial_dtypes[col]} to {self.df[col].dtype}"
                            print(f"   - Column '{col}': Converted to numeric.")
                    else:
                        # Attempt to convert to datetime
                        converted_dt = pd.to_datetime(self.df[col], errors='coerce')
                        if not converted_dt.isnull().all():
                            self.df[col] = converted_dt
                            if initial_dtypes[col] != self.df[col].dtype:
                                self.datatype_corrections[col] = f"Changed from {initial_dtypes[col]} to {self.df[col].dtype}"
                                print(f"   - Column '{col}': Converted to datetime.")
                except Exception as e:
                    print(f"   - Column '{col}': Could not convert object type. Error: {e}")

        if not self.datatype_corrections:
            print("   No datatype corrections made.")
        print("   Data type correction complete.")
        return self.df

    def detect_outliers(self, numerical_cols: List[str] = None) -> pd.DataFrame:
        """Identifies outliers in numerical columns using the IQR method."""
        print("\nüìä Detecting outliers...")
        if numerical_cols is None:
            numerical_cols = self.df.select_dtypes(include=['number']).columns.tolist()

        for col in numerical_cols:
            if col in self.df.columns:
                Q1 = self.df[col].quantile(0.25)
                Q3 = self.df[col].quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - 1.5 * IQR
                upper_bound = Q3 + 1.5 * IQR

                outliers = self.df[(self.df[col] < lower_bound) | (self.df[col] > upper_bound)]
                if not outliers.empty:
                    self.outlier_report[col] = {
                        "count": len(outliers),
                        "percentage": f"{(len(outliers) / len(self.df)) * 100:.2f}%",
                        "lower_bound": lower_bound,
                        "upper_bound": upper_bound
                    }
                    print(f"   - Column '{col}': Detected {len(outliers)} outliers (outside [{lower_bound:.2f}, {upper_bound:.2f}]).")

        if not self.outlier_report:
            print("   No significant outliers detected in numerical columns.")
        print("   Outlier detection complete.")
        return self.df

    def preprocess_data(self) -> pd.DataFrame:
        """Orchestrates the execution of preprocessing steps and returns the cleaned DataFrame."""
        print("\nüöÄ Starting data preprocessing...")

        self.df = self.clean_missing_values()
        self.df = self.correct_datatypes()
        self.df = self.detect_outliers()

        print("\n‚úÖ Data preprocessing complete.")
        print("\n--- Preprocessing Report ---")
        print("Missing Values Report:", self.missing_values_report)
        print("Datatype Corrections:", self.datatype_corrections)
        print("Outlier Report:", self.outlier_report)
        print("--------------------------")
        return self.df

# Instantiate and run the DataPreprocessingAgent
preprocessing_agent = DataPreprocessingAgent(df)
preprocessed_df = preprocessing_agent.preprocess_data()

print("\nPreprocessed DataFrame head:")
print(preprocessed_df.head())
print("\nPreprocessed DataFrame dtypes:")
print(preprocessed_df.dtypes)



üöÄ Starting data preprocessing...

üîç Cleaning missing values...
   No missing values found.
   Missing value cleaning complete.

‚öôÔ∏è Correcting data types...
   No datatype corrections made.
   Data type correction complete.

üìä Detecting outliers...
   No significant outliers detected in numerical columns.
   Outlier detection complete.

‚úÖ Data preprocessing complete.

--- Preprocessing Report ---
Missing Values Report: {}
Datatype Corrections: {}
Outlier Report: {}
--------------------------

Preprocessed DataFrame head:
                       origin_warehouse                     destination_store  \
0  d5566b15-b071-4acf-8e8e-c98433083b2d  50d33715-4c77-4dd9-8b9d-ff1ca372a2a2   
1  c42f0de8-b4f0-4167-abd1-ae79e5e18eea  172eb8f3-1033-4fb6-b66b-d0df09df3161   
2  b145f396-de9b-42f1-9cc9-f5b52c3a941c  65e4544d-42ae-4751-9580-bdcb90e5fcda   
3  f4372224-759f-43b3-bc83-ca6106bba1af  745bee4e-710c-4538-8df1-5c146e1092a6   
4  49d0edae-9091-41bb-a08d-ab1c66bd08d5  425b7a1a-b74

  converted_dt = pd.to_datetime(self.df[col], errors='coerce')
  converted_dt = pd.to_datetime(self.df[col], errors='coerce')
  converted_dt = pd.to_datetime(self.df[col], errors='coerce')
  converted_dt = pd.to_datetime(self.df[col], errors='coerce')


In [None]:
import pandas as pd
from typing import List

class DataPreprocessingAgent:
    """Responsible for identifying and handling missing values, correcting data types, and detecting outliers."""

    def __init__(self, df: pd.DataFrame):
        self.df = df.copy()
        self.missing_values_report = {}
        self.datatype_corrections = {}
        self.outlier_report = {}

    def clean_missing_values(self) -> pd.DataFrame:
        """Identifies and handles missing values. Fills numerical NaNs with median, categorical with mode."""
        print("\nüîç Cleaning missing values...")
        for col in self.df.columns:
            if self.df[col].isnull().any():
                missing_count = self.df[col].isnull().sum()
                missing_percentage = (missing_count / len(self.df)) * 100
                self.missing_values_report[col] = {
                    "count": missing_count,
                    "percentage": f"{missing_percentage:.2f}%"
                }

                if self.df[col].dtype in ['int64', 'float64']:
                    median_val = self.df[col].median()
                    self.df[col].fillna(median_val, inplace=True)
                    print(f"   - Column '{col}': Filled {missing_count} missing numerical values with median ({median_val}).")
                elif self.df[col].dtype == 'object' or self.df[col].dtype == 'bool': # Also cover boolean in case of NaN
                    mode_val = self.df[col].mode()[0] if not self.df[col].mode().empty else None
                    if mode_val is not None:
                        self.df[col].fillna(mode_val, inplace=True)
                        print(f"   - Column '{col}': Filled {missing_count} missing categorical/boolean values with mode ({mode_val}).")
                    else:
                        print(f"   - Column '{col}': Could not determine mode for missing values, leaving as is.")

        if not self.missing_values_report:
            print("   No missing values found.")
        print("   Missing value cleaning complete.")
        return self.df

    def correct_datatypes(self) -> pd.DataFrame:
        """Infers and corrects data types for columns."""
        print("\n‚öôÔ∏è Correcting data types...")
        initial_dtypes = self.df.dtypes.to_dict()
        for col in self.df.columns:
            # Try to convert object columns to numeric if possible
            if self.df[col].dtype == 'object':
                try:
                    # Attempt to convert to numeric, coercing errors to NaN
                    converted = pd.to_numeric(self.df[col], errors='coerce')
                    if not converted.isnull().all(): # If not all values became NaN, it's numeric
                        self.df[col] = converted
                        if initial_dtypes[col] != self.df[col].dtype:
                            self.datatype_corrections[col] = f"Changed from {initial_dtypes[col]} to {self.df[col].dtype}"
                            print(f"   - Column '{col}': Converted to numeric.")
                    else:
                        # Attempt to convert to datetime
                        converted_dt = pd.to_datetime(self.df[col], errors='coerce')
                        if not converted_dt.isnull().all():
                            self.df[col] = converted_dt
                            if initial_dtypes[col] != self.df[col].dtype:
                                self.datatype_corrections[col] = f"Changed from {initial_dtypes[col]} to {self.df[col].dtype}"
                                print(f"   - Column '{col}': Converted to datetime.")
                except Exception as e:
                    print(f"   - Column '{col}': Could not convert object type. Error: {e}")

        if not self.datatype_corrections:
            print("   No datatype corrections made.")
        print("   Data type correction complete.")
        return self.df

    def detect_outliers(self, numerical_cols: List[str] = None) -> pd.DataFrame:
        """Identifies outliers in numerical columns using the IQR method."""
        print("\nüìä Detecting outliers...")
        if numerical_cols is None:
            numerical_cols = self.df.select_dtypes(include=['number']).columns.tolist()

        for col in numerical_cols:
            if col in self.df.columns:
                Q1 = self.df[col].quantile(0.25)
                Q3 = self.df[col].quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - 1.5 * IQR
                upper_bound = Q3 + 1.5 * IQR

                outliers = self.df[(self.df[col] < lower_bound) | (self.df[col] > upper_bound)]
                if not outliers.empty:
                    self.outlier_report[col] = {
                        "count": len(outliers),
                        "percentage": f"{(len(outliers) / len(self.df)) * 100:.2f}%",
                        "lower_bound": lower_bound,
                        "upper_bound": upper_bound
                    }
                    print(f"   - Column '{col}': Detected {len(outliers)} outliers (outside [{lower_bound:.2f}, {upper_bound:.2f}]).")

        if not self.outlier_report:
            print("   No significant outliers detected in numerical columns.")
        print("   Outlier detection complete.")
        return self.df

    def preprocess_data(self) -> pd.DataFrame:
        """Orchestrates the execution of preprocessing steps and returns the cleaned DataFrame."""
        print("\nüöÄ Starting data preprocessing...")

        self.df = self.clean_missing_values()
        self.df = self.correct_datatypes()
        self.df = self.detect_outliers()

        print("\n‚úÖ Data preprocessing complete.")
        print("\n--- Preprocessing Report ---")
        print("Missing Values Report:", self.missing_values_report)
        print("Datatype Corrections:", self.datatype_corrections)
        print("Outlier Report:", self.outlier_report)
        print("--------------------------")
        return self.df

# Instantiate and run the DataPreprocessingAgent
preprocessing_agent = DataPreprocessingAgent(df)
preprocessed_df = preprocessing_agent.preprocess_data()

print("\nPreprocessed DataFrame head:")
print(preprocessed_df.head())
print("\nPreprocessed DataFrame dtypes:")
print(preprocessed_df.dtypes)



üöÄ Starting data preprocessing...

üîç Cleaning missing values...
   No missing values found.
   Missing value cleaning complete.

‚öôÔ∏è Correcting data types...
   No datatype corrections made.
   Data type correction complete.

üìä Detecting outliers...
   No significant outliers detected in numerical columns.
   Outlier detection complete.

‚úÖ Data preprocessing complete.

--- Preprocessing Report ---
Missing Values Report: {}
Datatype Corrections: {}
Outlier Report: {}
--------------------------

Preprocessed DataFrame head:
                       origin_warehouse                     destination_store  \
0  d5566b15-b071-4acf-8e8e-c98433083b2d  50d33715-4c77-4dd9-8b9d-ff1ca372a2a2   
1  c42f0de8-b4f0-4167-abd1-ae79e5e18eea  172eb8f3-1033-4fb6-b66b-d0df09df3161   
2  b145f396-de9b-42f1-9cc9-f5b52c3a941c  65e4544d-42ae-4751-9580-bdcb90e5fcda   
3  f4372224-759f-43b3-bc83-ca6106bba1af  745bee4e-710c-4538-8df1-5c146e1092a6   
4  49d0edae-9091-41bb-a08d-ab1c66bd08d5  425b7a1a-b74

  converted_dt = pd.to_datetime(self.df[col], errors='coerce')
  converted_dt = pd.to_datetime(self.df[col], errors='coerce')
  converted_dt = pd.to_datetime(self.df[col], errors='coerce')
  converted_dt = pd.to_datetime(self.df[col], errors='coerce')


In [None]:
class ExecutionAgent:
    """Simulates execution of autonomous plans"""

    def __init__(self, dataset_context: Dict):
        self.model = genai.GenerativeModel(MODEL_NAME)
        self.context = dataset_context
        self.execution_logs = []

    def execute_plan(self, plan: Dict) -> Dict:
        """Execute each step of the plan"""

        print(f"\nüîß EXECUTING: {plan['plan_id']}")
        print(f"   Objective: {plan['objective']}")

        execution_log = {
            "plan_id": plan['plan_id'],
            "started_at": utc_now(),
            "steps_executed": [],
            "status": "IN_PROGRESS"
        }

        for i, step in enumerate(plan['execution_steps']):
            step_number = step.get('step_number', i + 1) # Use existing step_number or assign based on index
            print(f"\n   Step {step_number}: {step['description']}")

            # Simulate execution with AI
            prompt = f"""Simulate executing this autonomous action:\n\nSTEP: {json.dumps(step, indent=2)}\n\nCONTEXT: {self.context['dataset_type']} system\n\nSimulate realistic execution and return result in JSON:\n{{\n  "step_number": {step_number},\n  "status": "SUCCESS|FAILED|PARTIAL",\n  "action_taken": "specific action performed",\n  "result_data": {{}},\n  "metrics": {{}},\n  "notes": "important details",\n  "timestamp": "{utc_now()}"\n}}\n\nBe realistic about what would actually happen."""

            response_text = safe_api_call(self.model, prompt)
            result = clean_json_response(response_text)

            execution_log["steps_executed"].append(result)

            status_icon = "‚úì" if result["status"] == "SUCCESS" else "‚úó"
            print(f"      {status_icon} {result['status']}: {result['action_taken']}")

            time.sleep(1)  # Rate limiting

        execution_log["completed_at"] = utc_now()
        execution_log["status"] = "COMPLETED" if all(
            s["status"] == "SUCCESS" for s in execution_log["steps_executed"]
        ) else "PARTIAL"

        self.execution_logs.append(execution_log)

        print(f"\n   ‚úÖ Plan {execution_log['status']}")

        return execution_log

# Execute all plans
executor_agent = ExecutionAgent(dataset_context)
execution_results = []

print("\n" + "="*60)
print("üöÄ AUTONOMOUS EXECUTION STARTED")
print("="*60)

for plan in execution_plans:
    result = executor_agent.execute_plan(plan)
    execution_results.append(result)
    time.sleep(2)  # Rate limiting between plans


üöÄ AUTONOMOUS EXECUTION STARTED

üîß EXECUTING: PLAN-PROB-001
   Objective: Resolve the data integrity issue where 'origin_warehouse', 'destination_store', and 'driver_identifier' columns have unique values per shipment, preventing meaningful performance analysis.

   Step 1: Initiate an investigation into the data generation process for shipment data to understand why 'origin_warehouse', 'destination_store', and 'driver_identifier' are unique per row.
      ‚úì SUCCESS: Called data_investigation_api with parameters: problem_id='PROB-001', investigation_focus=['data_source_origin', 'etl_processes', 'data_entry_procedures'], columns_of_concern=['origin_warehouse', 'destination_store', 'driver_identifier'].

   Step 2: Alert the data engineering and logistics operations teams about the data anomaly and the ongoing investigation.
      ‚úì SUCCESS: Sent high-severity alert via alerting_system to data_engineering_lead and logistics_operations_manager.

   Step 3: Based on the findings 

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px

class VisualizationAgent:
    """Generates interactive plots for key insights from the dataset intelligence and problem detection phases."""

    def __init__(self, df: pd.DataFrame, dataset_context: Dict, problem_analysis: Dict):
        self.df = df.copy()
        self.dataset_context = dataset_context
        self.problem_analysis = problem_analysis
        print("\nüìä VisualizationAgent Initialized.")

    def _plot_product_quantity_distribution(self):
        """Visualizes the distribution of product_quantity using an interactive histogram."""
        print("Generating Product Quantity Distribution plot...")
        fig = px.histogram(self.df, x='product_quantity',
                           title='Distribution of Product Quantity',
                           labels={'product_quantity': 'Product Quantity'})
        fig.show()

    def _plot_on_time_rates(self, group_col: str, title_prefix: str):
        """Visualizes on-time delivery rates broken down by a specified grouping column."""
        print(f"Generating On-Time Delivery Rates by {group_col} plot...")
        # Calculate on-time delivery rate
        on_time_rates = self.df.groupby(group_col)['on_time'].mean().reset_index()
        on_time_rates['on_time_percentage'] = on_time_rates['on_time'] * 100

        fig = px.bar(on_time_rates.sort_values(by='on_time_percentage', ascending=False),
                     x=group_col, y='on_time_percentage',
                     title=f'{title_prefix} On-Time Delivery Percentage',
                     labels={'on_time_percentage': 'On-Time Delivery (%)', group_col: group_col.replace('_', ' ').title()},
                     color='on_time_percentage', color_continuous_scale=px.colors.sequential.Plasma)
        fig.update_layout(xaxis_tickangle=-45)
        fig.show()

    def _plot_problem_hotspots(self):
        """Highlights problem hotspots identified by the ProblemDetectionAgent."""
        print("Generating Problem Hotspots visualization...")
        problems = self.problem_analysis.get('problems', [])

        # Example: if a problem indicates specific drivers or products are an issue
        problem_drivers = set()
        problem_products = set()

        for problem in problems:
            if 'driver' in problem.get('category', '').lower() or 'driver' in problem.get('description', '').lower():
                # This is a heuristic, in a real system, problem details would be more structured
                if 'driver_identifier' in self.df.columns:
                    # Try to extract driver IDs from description if not explicitly listed
                    for driver_id in self.df['driver_identifier'].unique():
                        if driver_id in problem.get('description', ''):
                            problem_drivers.add(driver_id)

            if 'product' in problem.get('category', '').lower() or 'product' in problem.get('description', '').lower():
                 if 'product' in self.df.columns:
                    for prod_name in self.df['product'].unique():
                        if prod_name in problem.get('description', ''):
                            problem_products.add(prod_name)

        if problem_drivers or problem_products:
            print("   Highlighting issues for:")
            if problem_drivers: print(f"     Drivers: {list(problem_drivers)}")
            if problem_products: print(f"     Products: {list(problem_products)}")

            # Visualize on-time rates again, but highlight problem entities
            df_problem_drivers = self.df[self.df['driver_identifier'].isin(list(problem_drivers))]
            df_problem_products = self.df[self.df['product'].isin(list(problem_products))]

            if not df_problem_drivers.empty:
                driver_performance = self.df.groupby('driver_identifier')['on_time'].mean().reset_index()
                driver_performance['highlight'] = driver_performance['driver_identifier'].apply(lambda x: 'Problem Driver' if x in problem_drivers else 'Other Drivers')
                fig_drivers = px.bar(driver_performance.sort_values(by='on_time', ascending=True),
                                     x='driver_identifier', y='on_time',
                                     color='highlight', color_discrete_map={'Problem Driver': 'red', 'Other Drivers': 'blue'},
                                     title='Driver Performance with Problem Drivers Highlighted',
                                     labels={'on_time': 'On-Time Delivery Rate', 'driver_identifier': 'Driver ID'})
                fig_drivers.update_layout(xaxis_tickangle=-45)
                fig_drivers.show()

            if not df_problem_products.empty:
                product_performance = self.df.groupby('product')['on_time'].mean().reset_index()
                product_performance['highlight'] = product_performance['product'].apply(lambda x: 'Problem Product' if x in problem_products else 'Other Products')
                fig_products = px.bar(product_performance.sort_values(by='on_time', ascending=True),
                                     x='product', y='on_time',
                                     color='highlight', color_discrete_map={'Problem Product': 'red', 'Other Products': 'blue'},
                                     title='Product Performance with Problem Products Highlighted',
                                     labels={'on_time': 'On-Time Delivery Rate', 'product': 'Product'})
                fig_products.update_layout(xaxis_tickangle=-45)
                fig_products.show()
        else:
            print("   No specific problem entities to highlight based on current problem analysis.")

    def generate_all_visualizations(self):
        """Orchestrates the generation and display of all defined plots."""
        print("\nüöÄ Generating All Visualizations...")
        self._plot_product_quantity_distribution()
        self._plot_on_time_rates('origin_warehouse', 'Origin Warehouse')
        self._plot_on_time_rates('driver_identifier', 'Driver')
        self._plot_problem_hotspots()
        print("\n‚úÖ All visualizations generated.")

# Instantiate the VisualizationAgent
visualization_agent = VisualizationAgent(preprocessed_df, dataset_context, problem_analysis)

# Call the method to generate all visualizations
visualization_agent.generate_all_visualizations()


üìä VisualizationAgent Initialized.

üöÄ Generating All Visualizations...
Generating Product Quantity Distribution plot...


Generating On-Time Delivery Rates by origin_warehouse plot...


Generating On-Time Delivery Rates by driver_identifier plot...


Generating Problem Hotspots visualization...
   No specific problem entities to highlight based on current problem analysis.

‚úÖ All visualizations generated.


In [None]:
from abc import ABC, abstractmethod
from typing import Dict

class SessionService(ABC):
    """Abstract base class defining the interface for session management."""

    @abstractmethod
    def get_session(self, session_id: str) -> Dict:
        """Retrieves session data for a given session ID."""
        pass

    @abstractmethod
    def update_session(self, session_id: str, data: Dict) -> None:
        """Updates session data for a given session ID."""
        pass

    @abstractmethod
    def delete_session(self, session_id: str) -> None:
        """Deletes a session for a given session ID."""
        pass

print("Abstract base class 'SessionService' defined.")

Abstract base class 'SessionService' defined.


In [None]:
class InMemorySessionService(SessionService):
    """Concrete implementation of SessionService using an in-memory dictionary."""

    def __init__(self):
        self.sessions = {}
        print("Initialized InMemorySessionService with an empty session store.")

    def get_session(self, session_id: str) -> Dict:
        """Retrieves session data for a given session ID."""
        return self.sessions.get(session_id, {})

    def update_session(self, session_id: str, data: Dict) -> None:
        """Updates session data for a given session ID."""
        self.sessions[session_id] = data
        print(f"Session '{session_id}' updated.")

    def delete_session(self, session_id: str) -> None:
        """Deletes a session for a given session ID."""
        if session_id in self.sessions:
            del self.sessions[session_id]
            print(f"Session '{session_id}' deleted.")
        else:
            print(f"Session '{session_id}' not found for deletion.")

print("'InMemorySessionService' implemented.")

'InMemorySessionService' implemented.


In [None]:
class DatasetIntelligenceAgent:
    """Understands what kind of data we're analyzing"""

    def __init__(self, session_service: SessionService, session_id: str):
        self.model = genai.GenerativeModel(MODEL_NAME)
        self.analysis = None
        self.session_service = session_service
        self.session_id = session_id

        # Attempt to retrieve previous analysis from session
        session_data = self.session_service.get_session(self.session_id)
        if 'dataset_analysis' in session_data:
            self.analysis = session_data['dataset_analysis']
            print(f"Retrieved dataset analysis from session {self.session_id}.")
        else:
            print(f"No previous dataset analysis found in session {self.session_id}.")

    def analyze_dataset(self, df: pd.DataFrame) -> Dict:
        """Deep analysis of dataset structure and business context"""

        if self.analysis is not None: # Use cached analysis if available
            print("Using cached dataset analysis.")
            return self.analysis

        # Prepare dataset summary
        summary = {
            "columns": list(df.columns),
            "dtypes": df.dtypes.astype(str).to_dict(),
            "row_count": len(df),
            "sample_data": df.head(5).to_dict('records'),
            "null_counts": df.isnull().sum().to_dict(),
            "unique_counts": {col: df[col].nunique() for col in df.columns}
        }

        prompt = f"""You are analyzing a business dataset. Determine what domain this belongs to and how to analyze it.

DATASET INFORMATION:
- Columns: {summary['columns']}
- Data types: {summary['dtypes']}
- Row count: {summary['row_count']}
- Unique value counts: {summary['unique_counts']}
- Sample rows: {json.dumps(summary['sample_data'][:3], indent=2, default=str)}

TASK: Analyze this dataset and provide a structured understanding.

Return ONLY a JSON object with this structure:
{{
  "domain": "string (e.g., 'supply_chain', 'sales', 'inventory', 'logistics', 'retail')",
  "dataset_type": "string (e.g., 'shipment_tracking', 'order_fulfillment', 'warehouse_inventory')",
  "business_context": "string (brief description of what this data represents)",
  "key_entities": ["list", "of", "main", "entities"],
  "problem_indicators": {{
    "column_name": "what problem it indicates"
  }},
  "success_metrics": {{
    "column_name": "what success it measures"
  }},
  "potential_issues": ["list", "of", "issues", "to", "monitor"],
  "autonomous_actions": ["list", "of", "actions", "an", "agent", "could", "take"],
  "analysis_strategy": "how to approach analyzing this data"
}}

Be specific based on actual column names and data patterns."""

        response_text = safe_api_call(self.model, prompt)
        self.analysis = clean_json_response(response_text)

        # Store the analysis in the session
        session_data = self.session_service.get_session(self.session_id)
        session_data['dataset_analysis'] = self.analysis
        self.session_service.update_session(self.session_id, session_data)
        print(f"Stored dataset analysis in session {self.session_id}.")

        print("\nü§ñ AI Dataset Analysis:")
        print(json.dumps(self.analysis, indent=2))

        return self.analysis

# Re-instantiate and run dataset analysis with session service
intelligence_agent = DatasetIntelligenceAgent(session_service, session_id)
dataset_context = intelligence_agent.analyze_dataset(df)

Retrieved dataset analysis from session analysis_session_20251129_151936.
Using cached dataset analysis.


In [None]:
class ProblemDetectionAgent:
    """Scans data for issues requiring intervention"""

    def __init__(self, dataset_context: Dict, session_service: SessionService, session_id: str):
        self.model = genai.GenerativeModel(MODEL_NAME)
        self.context = dataset_context
        self.session_service = session_service
        self.session_id = session_id
        self.problems_found = []
        self.problem_analysis_report = None

        # Attempt to retrieve previous problem analysis from session
        session_data = self.session_service.get_session(self.session_id)
        if 'problem_analysis_report' in session_data:
            self.problem_analysis_report = session_data['problem_analysis_report']
            self.problems_found = self.problem_analysis_report.get('problems', [])
            print(f"Retrieved problem analysis from session {self.session_id}.")
        else:
            print(f"No previous problem analysis found in session {self.session_id}.")

    def scan_dataset(self, df: pd.DataFrame) -> Dict:
        """Analyze entire dataset for patterns and problems"""

        if self.problem_analysis_report is not None: # Use cached analysis if available
            print("Using cached problem analysis.")
            return self.problem_analysis_report

        # Create aggregated view for pattern detection
        analysis_data = {
            "total_rows": len(df),
            "problem_indicators": self.context.get("problem_indicators", {}),
            "column_stats": {}
        }

        # Calculate relevant statistics
        for col in df.columns:
            if df[col].dtype in ['int64', 'float64']:
                analysis_data["column_stats"][col] = {
                    "mean": float(df[col].mean()),
                    "std": float(df[col].std()),
                    "min": float(df[col].min()),
                    "max": float(df[col].max())
                }
            elif df[col].dtype == 'bool':
                analysis_data["column_stats"][col] = {
                    "true_count": int(df[col].sum()),
                    "false_count": int((~df[col]).sum()),
                    "true_percentage": float(df[col].mean() * 100)
                }
            else:
                analysis_data["column_stats"][col] = {
                    "unique_count": int(df[col].nunique()),
                    "top_values": df[col].value_counts().head(5).to_dict()
                }

        prompt = f"""You are analyzing a {self.context['dataset_type']} dataset to find problems.

DATASET CONTEXT:
{json.dumps(self.context, indent=2)}

STATISTICAL ANALYSIS:
{json.dumps(analysis_data, indent=2, default=str)}

SAMPLE RECORDS:
{json.dumps(df.head(10).to_dict('records'), indent=2, default=str)}

TASK: Identify specific, actionable problems that require autonomous intervention.

Return ONLY a JSON object:
{{
  "problems": [
    {{
      "problem_id": "PROB-001",
      "severity": "CRITICAL|HIGH|MEDIUM|LOW",
      "category": "category name",
      "description": "specific problem description",
      "affected_records": "how many/which records",
      "business_impact": "what's the business impact",
      "requires_action": true/false,
      "suggested_actions": ["action1", "action2"]
    }}
  ],
  "summary": {{
    "total_problems": 0,
    "critical_count": 0,
    "high_count": 0,
    "requires_immediate_action": true/false
  }}
}}

Focus on REAL problems in the data, not hypothetical ones."""

        response_text = safe_api_call(self.model, prompt)
        result = clean_json_response(response_text)

        self.problems_found = result.get("problems", [])
        self.problem_analysis_report = result

        # Store the problem analysis in the session
        session_data = self.session_service.get_session(self.session_id)
        session_data['problem_analysis_report'] = self.problem_analysis_report
        self.session_service.update_session(self.session_id, session_data)
        print(f"Stored problem analysis in session {self.session_id}.")

        print(f"\n‚ö†Ô∏è  PROBLEM SCAN COMPLETE")
        print(f"   Total Problems Found: {result['summary']['total_problems']}")
        print(f"   Critical: {result['summary']['critical_count']}")
        print(f"   High: {result['summary']['high_count']}")

        return result

# Run problem detection with session service
problem_detector = ProblemDetectionAgent(dataset_context, session_service, session_id)
problem_analysis = problem_detector.scan_dataset(df)

No previous problem analysis found in session analysis_session_20251129_151936.
Session 'analysis_session_20251129_151936' updated.
Stored problem analysis in session analysis_session_20251129_151936.

‚ö†Ô∏è  PROBLEM SCAN COMPLETE
   Total Problems Found: 3
   Critical: 0
   High: 1


In [None]:
import time
from datetime import datetime, timezone
import json
import requests # Import the requests library to catch its exceptions

def utc_now():
    """Get current UTC timestamp"""
    return datetime.now(timezone.utc).isoformat()

def safe_api_call(model, prompt, max_retries=5): # Increased max_retries for more robustness
    """Make API call with retry logic, including connection errors"""
    for attempt in range(max_retries):
        try:
            response = model.generate_content(prompt)
            return response.text
        except (requests.exceptions.ConnectionError, requests.exceptions.RequestException, Exception) as e: # Catch ConnectionError and general RequestException
            if "429" in str(e) or "quota" in str(e).lower() or "Connection aborted" in str(e):
                wait_time = (attempt + 1) * 5 # Increased wait time for connection issues
                print(f"‚è≥ API call failed (Attempt {attempt + 1}/{max_retries}). Retrying after {wait_time}s due to: {e}")
                time.sleep(wait_time)
            else:
                print(f"‚ùå Unhandled API error: {e}")
                raise e
    raise Exception(f"Max retries ({max_retries}) exceeded for API call.")

def clean_json_response(text):
    """Extract and clean JSON from AI response"""
    text = text.strip()
    # Remove markdown code blocks
    text = text.replace('```json', '').replace('```', '')
    # Find JSON object boundaries
    start = text.find('{')
    end = text.rfind('}') + 1
    if start != -1 and end > start:
        text = text[start:end]
    return json.loads(text)

In [None]:
class ExecutionAgent:
    """Simulates execution of autonomous plans"""

    def __init__(self, dataset_context: Dict, session_service: SessionService, session_id: str):
        self.model = genai.GenerativeModel(MODEL_NAME)
        self.context = dataset_context
        self.session_service = session_service
        self.session_id = session_id
        self.execution_logs = []

        # Attempt to retrieve previous execution results from session
        session_data = self.session_service.get_session(self.session_id)
        if 'execution_results' in session_data:
            self.execution_logs = session_data['execution_results']
            print(f"Retrieved {len(self.execution_logs)} execution logs from session {self.session_id}.")
        else:
            print(f"No previous execution logs found in session {self.session_id}.")

    def execute_plan(self, plan: Dict) -> Dict:
        """Execute each step of the plan"""

        print(f"\n‚Ñõ EXECUTING: {plan['plan_id']}")
        print(f"   Objective: {plan['objective']}")

        execution_log = {
            "plan_id": plan['plan_id'],
            "started_at": utc_now(),
            "steps_executed": [],
            "status": "IN_PROGRESS"
        }

        for i, step in enumerate(plan['execution_steps']):
            step_number = step.get('step_number', i + 1) # Use existing step_number or assign based on index
            print(f"\n   Step {step_number}: {step['description']}")

            # Simulate execution with AI
            prompt = f"""Simulate executing this autonomous action:\n\nSTEP: {json.dumps(step, indent=2)}\n\nCONTEXT: {self.context['dataset_type']} system\n\nSimulate realistic execution and return result in JSON:\n{{\n  "step_number": {step_number},\n  "status": "SUCCESS|FAILED|PARTIAL",\n  "action_taken": "specific action performed",\n  "result_data": {{}},\n  "metrics": {{}},\n  "notes": "important details",\n  "timestamp": "{utc_now()}"\n}}\n\nBe realistic about what would actually happen."""

            response_text = safe_api_call(self.model, prompt)
            result = clean_json_response(response_text)

            execution_log["steps_executed"].append(result)

            status_icon = "‚úì" if result["status"] == "SUCCESS" else "‚úó"
            print(f"      {status_icon} {result['status']}: {result['action_taken']}")

            time.sleep(1)  # Rate limiting

        execution_log["completed_at"] = utc_now()
        execution_log["status"] = "COMPLETED" if all(
            s["status"] == "SUCCESS" for s in execution_log["steps_executed"]
        ) else "PARTIAL"

        self.execution_logs.append(execution_log)

        # Store the updated execution logs list in the session
        session_data = self.session_service.get_session(self.session_id)
        session_data['execution_results'] = self.execution_logs
        self.session_service.update_session(self.session_id, session_data)
        print(f"Stored updated execution logs in session {self.session_id}.")

        print(f"\n   ‚úÖ Plan {execution_log['status']}")

        return execution_log

# Execute all plans with session service
executor_agent = ExecutionAgent(dataset_context, session_service, session_id)
execution_results = []

print("\n" + "="*60)
print("üöÄ AUTONOMOUS EXECUTION STARTED")
print("="*60)

for plan in planner_agent.plans: # Use plans from planner_agent which are already in session
    result = executor_agent.execute_plan(plan)
    execution_results.append(result)
    time.sleep(2)  # Rate limiting between plans

No previous execution logs found in session analysis_session_20251129_151936.

üöÄ AUTONOMOUS EXECUTION STARTED

‚Ñõ EXECUTING: PLAN-PROB-001
   Objective: To significantly improve the on-time delivery rate by identifying and addressing the root causes of delivery delays.

   Step 1: Analyze the on-time delivery percentage by origin_warehouse to identify warehouses with a higher incidence of delays.
      ‚úì SUCCESS: Executed SQL query on the 'deliveries' table.

   Step 2: Analyze the on-time delivery percentage by destination_store to identify stores that are frequently experiencing late deliveries.
      ‚úì SUCCESS: Executed SQL query against the shipment_delivery_performance database.

   Step 3: Analyze the on-time delivery percentage by product to identify products that are more prone to delays.
      ‚úì SUCCESS: Executed SQL query against the 'deliveries' table in the shipment_delivery_performance system's SQL Database.

   Step 4: Analyze the on-time delivery percentage by 

In [None]:
from abc import ABC, abstractmethod
from typing import Any, Dict, List

class MemoryService(ABC):
    """Abstract base class defining the interface for a long-term memory system."""

    @abstractmethod
    def store_information(self, key: str, value: Any, metadata: Dict = None) -> None:
        """Stores information with a given key, value, and optional metadata."""
        pass

    @abstractmethod
    def retrieve_information(self, key: str) -> Any:
        """Retrieves information associated with a given key."""
        pass

    @abstractmethod
    def query_information(self, query: str, top_k: int = 1) -> List[Any]:
        """Queries the memory system based on a natural language query, returning top_k results."""
        pass

print("Abstract base class 'MemoryService' defined.")

Abstract base class 'MemoryService' defined.


## Implement MemoryBank

### Subtask:
Develop a concrete implementation of the long-term memory interface (MemoryService) using an in-memory dictionary.


In [None]:
class MemoryBank(MemoryService):
    """Concrete implementation of MemoryService using an in-memory dictionary."""

    def __init__(self):
        self.memory_store = {}
        print("Initialized MemoryBank with an empty memory store.")

    def store_information(self, key: str, value: Any, metadata: Dict = None) -> None:
        """Stores information with a given key, value, and optional metadata."""
        self.memory_store[key] = {"value": value, "metadata": metadata if metadata is not None else {}}
        print(f"Information stored for key: '{key}'.")

    def retrieve_information(self, key: str) -> Any:
        """Retrieves information associated with a given key."""
        entry = self.memory_store.get(key)
        return entry["value"] if entry else None

    def query_information(self, query: str, top_k: int = 1) -> List[Any]:
        """Queries the memory system based on a natural language query.
        For this in-memory dictionary, it performs an exact key match.
        """
        results = []
        if query in self.memory_store:
            results.append(self.memory_store[query]["value"])
        # For simplicity, if top_k > 1 and no exact match, return first top_k items or fewer
        elif not results:
            all_values = [entry["value"] for entry in self.memory_store.values()]
            results.extend(all_values[:top_k])

        return results

print("'MemoryBank' implemented.")

'MemoryBank' implemented.


## Integrate Long-Term Memory

### Subtask:
Integrate MemoryBank into DatasetIntelligenceAgent to store dataset analysis.


In [None]:
memory_service = MemoryBank()
print("Initialized MemoryBank instance.")

Initialized MemoryBank with an empty memory store.
Initialized MemoryBank instance.


In [None]:
import pandas as pd

# Clear previous analysis from the session to force re-generation
session_data = session_service.get_session(session_id)
if 'dataset_analysis' in session_data:
    del session_data['dataset_analysis']
    session_service.update_session(session_id, session_data)
    print(f"Cleared 'dataset_analysis' from session {session_id} to force re-analysis.")

class DatasetIntelligenceAgent:
    """Understands what kind of data we're analyzing"""

    def __init__(self, session_service: SessionService, session_id: str, memory_service: MemoryService):
        self.model = genai.GenerativeModel(MODEL_NAME)
        self.analysis = None
        self.session_service = session_service
        self.session_id = session_id
        self.memory_service = memory_service # Store memory service

        # Attempt to retrieve previous analysis from session
        session_data = self.session_service.get_session(self.session_id)
        if 'dataset_analysis' in session_data:
            self.analysis = session_data['dataset_analysis']
            print(f"Retrieved dataset analysis from session {self.session_id}.")
        else:
            print(f"No previous dataset analysis found in session {self.session_id}.")

    def analyze_dataset(self, df: pd.DataFrame) -> Dict:
        """Deep analysis of dataset structure and business context"""

        if self.analysis is not None: # Use cached analysis if available
            print("Using cached dataset analysis.")
            return self.analysis

        # Prepare dataset summary
        summary = {
            "columns": list(df.columns),
            "dtypes": df.dtypes.astype(str).to_dict(),
            "row_count": len(df),
            "sample_data": df.head(5).to_dict('records'),
            "null_counts": df.isnull().sum().to_dict(),
            "unique_counts": {col: df[col].nunique() for col in df.columns}
        }

        prompt = f"""You are analyzing a business dataset. Determine what domain this belongs to and how to analyze it.

DATASET INFORMATION:
- Columns: {summary['columns']}
- Data types: {summary['dtypes']}
- Row count: {summary['row_count']}
- Unique value counts: {summary['unique_counts']}
- Sample rows: {json.dumps(summary['sample_data'][:3], indent=2, default=str)}

TASK: Analyze this dataset and provide a structured understanding.

Return ONLY a JSON object with this structure:
{{
  "domain": "string (e.g., 'supply_chain', 'sales', 'inventory', 'logistics', 'retail')",
  "dataset_type": "string (e.g., 'shipment_tracking', 'order_fulfillment', 'warehouse_inventory')",
  "business_context": "string (brief description of what this data represents)",
  "key_entities": ["list", "of", "main", "entities"],
  "problem_indicators": {{
    "column_name": "what problem it indicates"
  }},
  "success_metrics": {{
    "column_name": "what success it measures"
  }},
  "potential_issues": ["list", "of", "issues", "to", "monitor"],
  "autonomous_actions": ["list", "of", "actions", "an", "agent", "could", "take"],
  "analysis_strategy": "how to approach analyzing this data"
}}

Be specific based on actual column names and data patterns."""

        response_text = safe_api_call(self.model, prompt)
        self.analysis = clean_json_response(response_text)

        # Store the analysis in the session
        session_data = self.session_service.get_session(self.session_id)
        session_data['dataset_analysis'] = self.analysis
        self.session_service.update_session(self.session_id, session_data)
        print(f"Stored dataset analysis in session {self.session_id}.")

        # Store the analysis in long-term memory
        memory_key = f"dataset_analysis_{self.session_id}"
        self.memory_service.store_information(key=memory_key, value=self.analysis, metadata={'session_id': self.session_id, 'timestamp': utc_now()})
        print(f"Stored dataset analysis in MemoryBank with key: {memory_key}.")

        print("\nü§ñ AI Dataset Analysis:")
        print(json.dumps(self.analysis, indent=2))

        return self.analysis

# Re-loading df for this execution context to resolve NameError
# This assumes 'shipping_data_0.csv' is the correct file name and is available in the environment.
csv_name = 'shipping_data_0.csv' # Explicitly set based on previous output
df = pd.read_csv(csv_name)

# Re-instantiate and run dataset analysis with session service and memory service
intelligence_agent = DatasetIntelligenceAgent(session_service, session_id, memory_service)
dataset_context = intelligence_agent.analyze_dataset(df)

Session 'analysis_session_20251129_151936' updated.
Cleared 'dataset_analysis' from session analysis_session_20251129_151936 to force re-analysis.
No previous dataset analysis found in session analysis_session_20251129_151936.
Session 'analysis_session_20251129_151936' updated.
Stored dataset analysis in session analysis_session_20251129_151936.
Information stored for key: 'dataset_analysis_analysis_session_20251129_151936'.
Stored dataset analysis in MemoryBank with key: dataset_analysis_analysis_session_20251129_151936.

ü§ñ AI Dataset Analysis:
{
  "domain": "logistics",
  "dataset_type": "shipment_delivery_performance",
  "business_context": "This dataset appears to track product shipments from origin warehouses to destination stores, focusing on delivery timeliness, product quantities, and the drivers involved. It is likely used to monitor and improve the efficiency and reliability of the transportation and delivery process.",
  "key_entities": [
    "warehouse",
    "store",
   

## Integrate Long-Term Memory into ProblemDetectionAgent

### Subtask:
Integrate MemoryBank into ProblemDetectionAgent to store detected problems and analysis reports.


In [None]:
class ProblemDetectionAgent:
    """Scans data for issues requiring intervention"""

    def __init__(self, dataset_context: Dict, session_service: SessionService, session_id: str, memory_service: MemoryService):
        self.model = genai.GenerativeModel(MODEL_NAME)
        self.context = dataset_context
        self.session_service = session_service
        self.session_id = session_id
        self.memory_service = memory_service # Store memory service
        self.problems_found = []
        self.problem_analysis_report = None

        # Attempt to retrieve previous problem analysis from session
        session_data = self.session_service.get_session(self.session_id)
        if 'problem_analysis_report' in session_data:
            self.problem_analysis_report = session_data['problem_analysis_report']
            self.problems_found = self.problem_analysis_report.get('problems', [])
            print(f"Retrieved problem analysis from session {self.session_id}.")
        else:
            print(f"No previous problem analysis found in session {self.session_id}.")

    def scan_dataset(self, df: pd.DataFrame) -> Dict:
        """Analyze entire dataset for patterns and problems"""

        if self.problem_analysis_report is not None: # Use cached analysis if available
            print("Using cached problem analysis.")
            return self.problem_analysis_report

        # Create aggregated view for pattern detection
        analysis_data = {
            "total_rows": len(df),
            "problem_indicators": self.context.get("problem_indicators", {}),
            "column_stats": {}
        }

        # Calculate relevant statistics
        for col in df.columns:
            if df[col].dtype in ['int64', 'float64']:
                analysis_data["column_stats"][col] = {
                    "mean": float(df[col].mean()),
                    "std": float(df[col].std()),
                    "min": float(df[col].min()),
                    "max": float(df[col].max())
                }
            elif df[col].dtype == 'bool':
                analysis_data["column_stats"][col] = {
                    "true_count": int(df[col].sum()),
                    "false_count": int((~df[col]).sum()),
                    "true_percentage": float(df[col].mean() * 100)
                }
            else:
                analysis_data["column_stats"][col] = {
                    "unique_count": int(df[col].nunique()),
                    "top_values": df[col].value_counts().head(5).to_dict()
                }

        prompt = f"""You are analyzing a {self.context['dataset_type']} dataset to find problems.

DATASET CONTEXT:
{json.dumps(self.context, indent=2)}

STATISTICAL ANALYSIS:
{json.dumps(analysis_data, indent=2, default=str)}

SAMPLE RECORDS:
{json.dumps(df.head(10).to_dict('records'), indent=2, default=str)}

TASK: Identify specific, actionable problems that require autonomous intervention.

Return ONLY a JSON object:
{{
  "problems": [
    {{
      "problem_id": "PROB-001",
      "severity": "CRITICAL|HIGH|MEDIUM|LOW",
      "category": "category name",
      "description": "specific problem description",
      "affected_records": "how many/which records",
      "business_impact": "what's the business impact",
      "requires_action": true/false,
      "suggested_actions": ["action1", "action2"]
    }}
  ],
  "summary": {{
    "total_problems": 0,
    "critical_count": 0,
    "high_count": 0,
    "requires_immediate_action": true/false
  }}
}}

Focus on REAL problems in the data, not hypothetical ones."""

        response_text = safe_api_call(self.model, prompt)
        result = clean_json_response(response_text)

        self.problems_found = result.get("problems", [])
        self.problem_analysis_report = result

        # Store the problem analysis in the session
        session_data = self.session_service.get_session(self.session_id)
        session_data['problem_analysis_report'] = self.problem_analysis_report
        self.session_service.update_session(self.session_id, session_data)
        print(f"Stored problem analysis in session {self.session_id}.")

        # Store the problem analysis in long-term memory
        memory_key = f"problem_analysis_{self.session_id}"
        self.memory_service.store_information(key=memory_key, value=self.problem_analysis_report, metadata={'session_id': self.session_id, 'timestamp': utc_now()})
        print(f"Stored problem analysis in MemoryBank with key: {memory_key}.")

        print(f"\n‚ö†Ô∏è  PROBLEM SCAN COMPLETE")
        print(f"   Total Problems Found: {result['summary']['total_problems']}")
        print(f"   Critical: {result['summary']['critical_count']}")
        print(f"   High: {result['summary']['high_count']}")

        return result

# Re-instantiate and run problem detection with session and memory service
problem_detector = ProblemDetectionAgent(dataset_context, session_service, session_id, memory_service)
problem_analysis = problem_detector.scan_dataset(df)

Retrieved problem analysis from session analysis_session_20251129_151936.
Using cached problem analysis.


## Integrate Long-Term Memory into AutonomousPlannerAgent

### Subtask:
Integrate MemoryBank into AutonomousPlannerAgent to store generated plans.


In [None]:
session_data = session_service.get_session(session_id)
if 'execution_plans' in session_data:
    del session_data['execution_plans']
    session_service.update_session(session_id, session_data)
    print(f"Cleared 'execution_plans' from session {session_id} to force re-planning.")

Session 'analysis_session_20251129_151936' updated.
Cleared 'execution_plans' from session analysis_session_20251129_151936 to force re-planning.


In [None]:
class AutonomousPlannerAgent:
    """Creates detailed action plans for each problem"""

    def __init__(self, dataset_context: Dict, session_service: SessionService, session_id: str, memory_service: MemoryService):
        self.model = genai.GenerativeModel(MODEL_NAME)
        self.context = dataset_context
        self.session_service = session_service
        self.session_id = session_id
        self.memory_service = memory_service # Store memory service
        self.plans = []

        # Attempt to retrieve previous plans from session
        session_data = self.session_service.get_session(self.session_id)
        if 'execution_plans' in session_data:
            self.plans = session_data['execution_plans']
            print(f"Retrieved {len(self.plans)} plans from session {self.session_id}.")
        else:
            print(f"No previous plans found in session {self.session_id}.")

    def create_plan(self, problem: Dict, df: pd.DataFrame) -> Dict:
        """Generate detailed autonomous action plan"""

        prompt = f"""You are an autonomous planning agent for {self.context['dataset_type']}.

PROBLEM TO SOLVE:
{json.dumps(problem, indent=2)}

BUSINESS CONTEXT:
- Domain: {self.context['domain']}
- Key entities: {self.context['key_entities']}
- Available actions: {self.context['autonomous_actions']}

TASK: Create a detailed, step-by-step execution plan.

Return ONLY a JSON object:
{{
  "plan_id": "PLAN-{problem['problem_id']}",
  "problem_reference": "{problem['problem_id']}",
  "objective": "clear statement of what we're trying to achieve",
  "execution_steps": [
    {{
      "step_number": 1,
      "action_type": "API_CALL|DATABASE_QUERY|NOTIFICATION|REROUTE|REORDER",
      "description": "what to do",
      "tool_needed": "specific tool/API",
      "parameters": {{}},
      "expected_outcome": "what success looks like",
      "rollback_plan": "what to do if this fails"
    }}
  ],
  "success_criteria": ["criterion1", "criterion2"],
  "estimated_impact": "quantifiable benefit",
  "estimated_time": "how long to execute",
  "dependencies": ["what needs to happen first"]
}}

Make the plan realistic and executable with specific details."""

        response_text = safe_api_call(self.model, prompt)
        plan = clean_json_response(response_text)

        plan["created_at"] = utc_now()
        self.plans.append(plan)

        # Store the updated plans list in the session
        session_data = self.session_service.get_session(self.session_id)
        session_data['execution_plans'] = self.plans
        self.session_service.update_session(self.session_id, session_data)
        print(f"Stored updated plans in session {self.session_id}.")

        # Store the plan in long-term memory
        memory_key = f"plan_{plan['plan_id']}_{self.session_id}"
        self.memory_service.store_information(key=memory_key, value=plan, metadata={'session_id': self.session_id, 'problem_id': plan['problem_reference'], 'timestamp': utc_now()})
        print(f"Stored plan '{plan['plan_id']}' in MemoryBank with key: {memory_key}.")

        return plan

# Create plans for each problem with session and memory service
planner_agent = AutonomousPlannerAgent(dataset_context, session_service, session_id, memory_service)
execution_plans = []

print("\n\u2501"*60)
print("\u2501 CREATING AUTONOMOUS PLANS...")
print("\u2501"*60)
for i, problem in enumerate(problem_detector.problems_found):
    if problem.get("requires_action"):
        print(f"\n[{i+1}] Planning for: {problem['problem_id']}")
        plan = planner_agent.create_plan(problem, df)
        execution_plans.append(plan)
        print(f"   \u2713 Plan created: {len(plan['execution_steps'])} steps")
        time.sleep(1)  # Rate limiting

No previous plans found in session analysis_session_20251129_151936.

‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ
‚îÅ CREATING AUTONOMOUS PLANS...
‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ

[1] Planning for: PROB-001
Session 'analysis_session_20251129_151936' updated.
Stored updated plans in session analysis_session_20251129_151936.
Information stored for key: 'plan_PLAN-PROB-001_analysis_session_20251129_151936'.
Stored plan 'PLAN-PROB-001' in MemoryBank with key: plan_PLAN-PROB-001_analysis_session_20251129_151936.
   ‚úì Plan created: 8 steps

[2] Planning for: PROB-002
Session 'analysis_session_20251129_151936' updated.
Stored updated plans in session a

## Integrate Long-Term Memory into ExecutionAgent

### Subtask:
Integrate MemoryBank into ExecutionAgent to store execution logs.


In [None]:
class ExecutionAgent:
    """Simulates execution of autonomous plans"""

    def __init__(self, dataset_context: Dict, session_service: SessionService, session_id: str, memory_service: MemoryService):
        self.model = genai.GenerativeModel(MODEL_NAME)
        self.context = dataset_context
        self.session_service = session_service
        self.session_id = session_id
        self.memory_service = memory_service # Store memory service
        self.execution_logs = []

        # Attempt to retrieve previous execution results from session
        session_data = self.session_service.get_session(self.session_id)
        if 'execution_results' in session_data:
            self.execution_logs = session_data['execution_results']
            print(f"Retrieved {len(self.execution_logs)} execution logs from session {self.session_id}.")
        else:
            print(f"No previous execution logs found in session {self.session_id}.")

    def execute_plan(self, plan: Dict) -> Dict:
        """Execute each step of the plan"""

        print(f"\n‚Ñõ EXECUTING: {plan['plan_id']}")
        print(f"   Objective: {plan['objective']}")

        execution_log = {
            "plan_id": plan['plan_id'],
            "started_at": utc_now(),
            "steps_executed": [],
            "status": "IN_PROGRESS"
        }

        for i, step in enumerate(plan['execution_steps']):
            step_number = step.get('step_number', i + 1) # Use existing step_number or assign based on index
            print(f"\n   Step {step_number}: {step['description']}")

            # Simulate execution with AI
            prompt = f"""Simulate executing this autonomous action:\n\nSTEP: {json.dumps(step, indent=2)}\n\nCONTEXT: {self.context['dataset_type']} system\n\nSimulate realistic execution and return result in JSON:\n{{\n  "step_number": {step_number},\n  "status": "SUCCESS|FAILED|PARTIAL",\n  "action_taken": "specific action performed",\n  "result_data": {{}},\n  "metrics": {{}},\n  "notes": "important details",\n  "timestamp": "{utc_now()}"\n}}\n\nBe realistic about what would actually happen."""

            response_text = safe_api_call(self.model, prompt)
            result = clean_json_response(response_text)

            execution_log["steps_executed"].append(result)

            status_icon = "‚úì" if result["status"] == "SUCCESS" else "‚úó"
            print(f"      {status_icon} {result['status']}: {result['action_taken']}")

            time.sleep(1)  # Rate limiting

        execution_log["completed_at"] = utc_now()
        execution_log["status"] = "COMPLETED" if all(
            s["status"] == "SUCCESS" for s in execution_log["steps_executed"]
        ) else "PARTIAL"

        self.execution_logs.append(execution_log)

        # Store the updated execution logs list in the session
        session_data = self.session_service.get_session(self.session_id)
        session_data['execution_results'] = self.execution_logs
        self.session_service.update_session(self.session_id, session_data)
        print(f"Stored updated execution logs in session {self.session_id}.")

        # Store the execution log in long-term memory
        memory_key = f"execution_log_{plan['plan_id']}_{self.session_id}"
        self.memory_service.store_information(key=memory_key, value=execution_log, metadata={'session_id': self.session_id, 'plan_id': plan['plan_id'], 'timestamp': utc_now()})
        print(f"Stored execution log for plan '{plan['plan_id']}' in MemoryBank with key: {memory_key}.")

        print(f"\n   ‚úÖ Plan {execution_log['status']}")

        return execution_log

# Clear previous execution results from the session to force re-execution and re-storage
session_data = session_service.get_session(session_id)
if 'execution_results' in session_data:
    del session_data['execution_results']
    session_service.update_session(session_id, session_data)
    print(f"Cleared 'execution_results' from session {session_id} to force re-execution.")


# Execute all plans with session and memory service
executor_agent = ExecutionAgent(dataset_context, session_service, session_id, memory_service)
execution_results = []

print("\n" + "="*60)
print("üöÄ AUTONOMOUS EXECUTION STARTED")
print("="*60)

for plan in planner_agent.plans: # Use plans from planner_agent which are already in session
    result = executor_agent.execute_plan(plan)
    execution_results.append(result)
    time.sleep(2)  # Rate limiting between plans

Session 'analysis_session_20251129_151936' updated.
Cleared 'execution_results' from session analysis_session_20251129_151936 to force re-execution.
No previous execution logs found in session analysis_session_20251129_151936.

üöÄ AUTONOMOUS EXECUTION STARTED

‚Ñõ EXECUTING: PLAN-PROB-001
   Objective: To identify and address the root causes of delivery delays and improve the overall on-time delivery rate.

   Step 1: Analyze on-time delivery percentage aggregated by 'origin_warehouse' to identify warehouses with a disproportionately high number of late deliveries.
      ‚úì SUCCESS: Executed SQL query to analyze on-time delivery percentage by origin_warehouse.

   Step 2: Analyze on-time delivery percentage aggregated by 'destination_store' to identify stores that are frequently receiving late deliveries.
      ‚úì SUCCESS: Executed SQL query against the shipment_delivery_performance database.

   Step 3: Analyze on-time delivery percentage aggregated by 'product_id' to identify spe