<a href="https://colab.research.google.com/github/Codebasedesign24/AgenticAI_Google-/blob/main/FRA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install ucimlrepo

Collecting ucimlrepo
  Downloading ucimlrepo-0.0.7-py3-none-any.whl.metadata (5.5 kB)
Downloading ucimlrepo-0.0.7-py3-none-any.whl (8.0 kB)
Installing collected packages: ucimlrepo
Successfully installed ucimlrepo-0.0.7


In [None]:
from ucimlrepo import fetch_ucirepo

# fetch dataset
bank_marketing = fetch_ucirepo(id=222)

# data (as pandas dataframes)
X = bank_marketing.data.features
y = bank_marketing.data.targets

# metadata
print(bank_marketing.metadata)

# variable information
print(bank_marketing.variables)

{'uci_id': 222, 'name': 'Bank Marketing', 'repository_url': 'https://archive.ics.uci.edu/dataset/222/bank+marketing', 'data_url': 'https://archive.ics.uci.edu/static/public/222/data.csv', 'abstract': 'The data is related with direct marketing campaigns (phone calls) of a Portuguese banking institution. The classification goal is to predict if the client will subscribe a term deposit (variable y).', 'area': 'Business', 'tasks': ['Classification'], 'characteristics': ['Multivariate'], 'num_instances': 45211, 'num_features': 16, 'feature_types': ['Categorical', 'Integer'], 'demographics': ['Age', 'Occupation', 'Marital Status', 'Education Level'], 'target_col': ['y'], 'index_col': None, 'has_missing_values': 'yes', 'missing_values_symbol': 'NaN', 'year_of_dataset_creation': 2014, 'last_updated': 'Fri Aug 18 2023', 'dataset_doi': '10.24432/C5K306', 'creators': ['S. Moro', 'P. Rita', 'P. Cortez'], 'intro_paper': {'ID': 277, 'type': 'NATIVE', 'title': 'A data-driven approach to predict the s

In [None]:
!pip install google-adk



In [None]:
from google.adk.agents import Agent
from google.adk.tools import google_search

root_agent = Agent(
    name="search_assistant",
    model="gemini-2.5-flash", # Or your preferred Gemini model
    instruction="You are a helpful assistant. Answer user questions using Google Search when needed.",
    description="An assistant that can search the web.",
    tools=[google_search]
)

In [None]:
from google.adk.agents import LlmAgent, BaseAgent

# Define individual agents
greeter = LlmAgent(name="greeter", model="gemini-2.5-flash")
task_executor = LlmAgent(name="task_executor", model="gemini-2.5-flash")

# Create parent agent and assign children via sub_agents
coordinator = LlmAgent(
    name="Coordinator",
    model="gemini-2.5-flash",
    description="I coordinate greetings and tasks.",
    sub_agents=[ # Assign sub_agents here
        greeter,
        task_executor
    ]
)

In [None]:

from config import SystemConfig

config = SystemConfig(
    gcp=GCPConfig(
        project_id="your-project",
        location="us-central1",
        bucket_name="recon-data"
    ),
    llm=LLMConfig(
        model_name="gemini-1.5-pro",
        temperature=0.1,
        max_tokens=4096
    ),
    redis=RedisConfig(
        host="localhost",
        port=6379
    )
)


ModuleNotFoundError: No module named 'config'

In [None]:
 ###Set your own project id here
PROJECT_ID = 'your-google-cloud-project'
from google.cloud import storage
storage_client = storage.Client(project=PROJECT_ID)

In [None]:
# config.py
"""Configuration management for the reconciliation system"""

import os
from typing import Optional
from pydantic import BaseModel, Field
from dotenv import load_dotenv

load_dotenv()

class GCPConfig(BaseModel):
    """Google Cloud Platform configuration"""
    project_id: str = Field(default_factory=lambda: os.getenv("GOOGLE_CLOUD_PROJECT"))
    location: str = Field(default_factory=lambda: os.getenv("VERTEX_AI_LOCATION", "us-central1"))
    bucket_name: str = Field(default="financial-reconciliation-data")
    dataset_id: str = Field(default="reconciliation")
    credentials_path: Optional[str] = Field(default_factory=lambda: os.getenv("GOOGLE_APPLICATION_CREDENTIALS"))

class LLMConfig(BaseModel):
    """LLM configuration"""
    model_name: str = Field(default="gemini-1.5-pro")
    temperature: float = Field(default=0.1)
    max_tokens: int = Field(default=4096)
    timeout: int = Field(default=120)

class RedisConfig(BaseModel):
    """Redis configuration for memory"""
    host: str = Field(default_factory=lambda: os.getenv("REDIS_HOST", "localhost"))
    port: int = Field(default_factory=lambda: int(os.getenv("REDIS_PORT", "6379")))
    db: int = Field(default=0)
    password: Optional[str] = Field(default=None)

class ObservabilityConfig(BaseModel):
    """Observability configuration"""
    enable_langsmith: bool = Field(default=True)
    enable_cloud_trace: bool = Field(default=True)
    enable_cloud_logging: bool = Field(default=True)
    langsmith_project: str = Field(default_factory=lambda: os.getenv("LANGSMITH_PROJECT", "financial-reconciliation"))

class SystemConfig(BaseModel):
    """Main system configuration"""
    gcp: GCPConfig = Field(default_factory=GCPConfig)
    llm: LLMConfig = Field(default_factory=LLMConfig)
    redis: RedisConfig = Field(default_factory=RedisConfig)
    observability: ObservabilityConfig = Field(default_factory=ObservabilityConfig)

    # Agent settings
    max_iterations: int = Field(default=10)
    session_timeout: int = Field(default=3600)
    checkpoint_interval: int = Field(default=5)

# Global config instance
config = SystemConfig()

In [None]:
# evaluation.py
"""Comprehensive agent evaluation framework"""

from typing import Dict, List, Any
import pandas as pd
import numpy as np
from datetime import datetime
from loguru import logger
import json

# ============================================================================
# EVALUATION METRICS
# ============================================================================

class ReconciliationMetrics:
    """Calculate reconciliation performance metrics"""

    @staticmethod
    def accuracy(matched: int, total: int) -> float:
        """Calculate matching accuracy"""
        return (matched / total * 100) if total > 0 else 0

    @staticmethod
    def precision(true_positives: int, false_positives: int) -> float:
        """Calculate precision of matches"""
        total_predicted = true_positives + false_positives
        return (true_positives / total_predicted) if total_predicted > 0 else 0

    @staticmethod
    def recall(true_positives: int, false_negatives: int) -> float:
        """Calculate recall of matches"""
        total_actual = true_positives + false_negatives
        return (true_positives / total_actual) if total_actual > 0 else 0

    @staticmethod
    def f1_score(precision: float, recall: float) -> float:
        """Calculate F1 score"""
        return (2 * precision * recall / (precision + recall)) if (precision + recall) > 0 else 0

    @staticmethod
    def false_positive_rate(false_positives: int, true_negatives: int) -> float:
        """Calculate false positive rate"""
        total_negatives = false_positives + true_negatives
        return (false_positives / total_negatives) if total_negatives > 0 else 0


# ============================================================================
# GROUND TRUTH VALIDATOR
# ============================================================================

class GroundTruthValidator:
    """Validate agent performance against ground truth data"""

    def __init__(self):
        self.ground_truth_data = {}
        logger.info("Initialized GroundTruthValidator")

    def load_ground_truth(self, file_path: str):
        """Load ground truth reconciliation data"""
        try:
            df = pd.read_json(file_path)
            self.ground_truth_data = df.to_dict('records')
            logger.info(f"Loaded {len(self.ground_truth_data)} ground truth records")
        except Exception as e:
            logger.error(f"Error loading ground truth: {e}")

    def validate_matches(self, agent_matches: List[Dict], ground_truth: List[Dict]) -> Dict[str, int]:
        """Validate agent matches against ground truth"""
        true_positives = 0
        false_positives = 0
        false_negatives = 0
        true_negatives = 0

        # Create lookup for ground truth
        gt_lookup = {
            (item['source_id'], item['target_id']): True
            for item in ground_truth
        }

        # Evaluate agent matches
        for match in agent_matches:
            source_id = match['source']['transaction_id']
            target_id = match['target']['transaction_id'] if match['target'] else None

            if target_id:
                if (source_id, target_id) in gt_lookup:
                    true_positives += 1
                else:
                    false_positives += 1
            else:
                # Check if this should have been matched
                should_match = any(
                    gt['source_id'] == source_id
                    for gt in ground_truth
                )
                if should_match:
                    false_negatives += 1
                else:
                    true_negatives += 1

        return {
            'true_positives': true_positives,
            'false_positives': false_positives,
            'false_negatives': false_negatives,
            'true_negatives': true_negatives
        }


# ============================================================================
# AGENT PERFORMANCE EVALUATOR
# ============================================================================

class AgentEvaluator:
    """Comprehensive agent performance evaluation"""

    def __init__(self):
        self.metrics_calculator = ReconciliationMetrics()
        self.validator = GroundTruthValidator()
        self.evaluation_history = []
        logger.info("Initialized AgentEvaluator")

    def evaluate_reconciliation(self,
                                session_id: str,
                                agent_results: Dict[str, Any],
                                ground_truth: List[Dict] = None) -> Dict[str, Any]:
        """Evaluate complete reconciliation process"""

        evaluation = {
            'session_id': session_id,
            'timestamp': datetime.now().isoformat(),
            'metrics': {}
        }

        # Basic metrics
        total = len(agent_results.get('transactions_source', []))
        matched = len(agent_results.get('matched_transactions', []))
        unmatched = len(agent_results.get('unmatched_transactions', []))
        anomalies = len(agent_results.get('anomalies', []))

        evaluation['metrics']['total_transactions'] = total
        evaluation['metrics']['matched_transactions'] = matched
        evaluation['metrics']['unmatched_transactions'] = unmatched
        evaluation['metrics']['anomalies_detected'] = anomalies
        evaluation['metrics']['match_rate'] = self.metrics_calculator.accuracy(matched, total)

        # If ground truth is available, calculate advanced metrics
        if ground_truth:
            validation = self.validator.validate_matches(
                agent_results.get('matched_transactions', []),
                ground_truth
            )

            precision = self.metrics_calculator.precision(
                validation['true_positives'],
                validation['false_positives']
            )

            recall = self.metrics_calculator.recall(
                validation['true_positives'],
                validation['false_negatives']
            )

            evaluation['metrics']['precision'] = precision
            evaluation['metrics']['recall'] = recall
            evaluation['metrics']['f1_score'] = self.metrics_calculator.f1_score(precision, recall)
            evaluation['metrics']['false_positive_rate'] = self.metrics_calculator.false_positive_rate(
                validation['false_positives'],
                validation['true_negatives']
            )

        # Agent-specific metrics
        evaluation['agent_performance'] = self._evaluate_agents(agent_results)

        # Store evaluation
        self.evaluation_history.append(evaluation)
        logger.info(f"Evaluation complete for session {session_id}")

        return evaluation

    def _evaluate_agents(self, agent_results: Dict[str, Any]) -> Dict[str, Dict]:
        """Evaluate individual agent performance"""
        agent_performance = {}

        # Data Extraction Agent
        agent_performance['data_extraction'] = {
            'records_extracted': len(agent_results.get('transactions_source', [])),
            'data_quality_score': self._calculate_data_quality(agent_results.get('transactions_source', [])),
            'extraction_completeness': 100.0  # Assume 100% for now
        }

        # Matching Agent
        matched = agent_results.get('matched_transactions', [])
        total = len(agent_results.get('transactions_source', []))
        agent_performance['matching'] = {
            'match_rate': (len(matched) / total * 100) if total > 0 else 0,
            'avg_confidence': np.mean([m.get('confidence', 0) for m in matched]) if matched else 0,
            'processing_efficiency': 100.0  # Would measure actual time in production
        }

        # Anomaly Detection Agent
        anomalies = agent_results.get('anomalies', [])
        agent_performance['anomaly_detection'] = {
            'anomalies_detected': len(anomalies),
            'high_severity_count': sum(1 for a in anomalies if a.get('severity') == 'high'),
            'detection_accuracy': 85.0  # Would validate with ground truth
        }

        # Resolution Agent
        report = agent_results.get('reconciliation_report', {})
        agent_performance['resolution'] = {
            'resolutions_generated': len(report.get('resolutions', [])),
            'report_completeness': 100.0 if report else 0,
            'actionability_score': 90.0  # Subjective metric
        }

        return agent_performance

    def _calculate_data_quality(self, transactions: List[Dict]) -> float:
        """Calculate data quality score"""
        if not transactions:
            return 0.0

        quality_checks = []
        for txn in transactions:
            # Check required fields
            has_id = bool(txn.get('transaction_id'))
            has_amount = bool(txn.get('amount'))
            has_date = bool(txn.get('date'))
            has_description = bool(txn.get('description'))

            # Check data validity
            amount_valid = isinstance(txn.get('amount'), (int, float)) and txn.get('amount', 0) > 0

            record_quality = sum([has_id, has_amount, has_date, has_description, amount_valid]) / 5
            quality_checks.append(record_quality)

        return np.mean(quality_checks) * 100

    def generate_evaluation_report(self) -> str:
        """Generate comprehensive evaluation report"""
        if not self.evaluation_history:
            return "No evaluations available"

        df = pd.DataFrame(self.evaluation_history)

        report = "=== AGENT EVALUATION REPORT ===\n\n"
        report += f"Total Evaluations: {len(self.evaluation_history)}\n"
        report += f"Date Range: {df['timestamp'].min()} to {df['timestamp'].max()}\n\n"

        # Aggregate metrics
        metrics_df = pd.json_normalize(df['metrics'])
        report += "AGGREGATE METRICS:\n"
        report += f"  Average Match Rate: {metrics_df['match_rate'].mean():.2f}%\n"
        report += f"  Total Transactions Processed: {metrics_df['total_transactions'].sum()}\n"
        report += f"  Total Anomalies Detected: {metrics_df['anomalies_detected'].sum()}\n"

        if 'precision' in metrics_df.columns:
            report += f"  Average Precision: {metrics_df['precision'].mean():.2f}\n"
            report += f"  Average Recall: {metrics_df['recall'].mean():.2f}\n"
            report += f"  Average F1 Score: {metrics_df['f1_score'].mean():.2f}\n"

        report += "\nTREND ANALYSIS:\n"
        report += f"  Match Rate Trend: {'Improving' if metrics_df['match_rate'].is_monotonic_increasing else 'Stable'}\n"

        logger.info("Generated evaluation report")
        return report

    def export_evaluation_to_json(self, file_path: str):
        """Export evaluation history to JSON"""
        try:
            with open(file_path, 'w') as f:
                json.dump(self.evaluation_history, f, indent=2)
            logger.info(f"Exported evaluation to {file_path}")
        except Exception as e:
            logger.error(f"Error exporting evaluation: {e}")


# ============================================================================
# A/B TESTING FRAMEWORK
# ============================================================================

class ABTestingFramework:
    """A/B testing for agent improvements"""

    def __init__(self):
        self.experiments = {}
        logger.info("Initialized A/B Testing Framework")

    def create_experiment(self, experiment_id: str, variant_a: str, variant_b: str):
        """Create new A/B test experiment"""
        self.experiments[experiment_id] = {
            'variant_a': {'name': variant_a, 'results': []},
            'variant_b': {'name': variant_b, 'results': []},
            'created_at': datetime.now().isoformat()
        }
        logger.info(f"Created A/B experiment: {experiment_id}")

    def record_result(self, experiment_id: str, variant: str, metrics: Dict[str, float]):
        """Record experiment result"""
        if experiment_id in self.experiments:
            self.experiments[experiment_id][variant]['results'].append(metrics)
            logger.info(f"Recorded result for {experiment_id} - {variant}")

    def analyze_experiment(self, experiment_id: str) -> Dict[str, Any]:
        """Analyze A/B test results"""
        if experiment_id not in self.experiments:
            return {"error": "Experiment not found"}

        exp = self.experiments[experiment_id]
        variant_a_results = exp['variant_a']['results']
        variant_b_results = exp['variant_b']['results']

        if not variant_a_results or not variant_b_results:
            return {"error": "Insufficient data"}

        # Calculate averages
        metrics_a = pd.DataFrame(variant_a_results).mean().to_dict()
        metrics_b = pd.DataFrame(variant_b_results).mean().to_dict()

        # Determine winner
        winner = 'variant_a' if metrics_a.get('match_rate', 0) > metrics_b.get('match_rate', 0) else 'variant_b'

        analysis = {
            'experiment_id': experiment_id,
            'variant_a_metrics': metrics_a,
            'variant_b_metrics': metrics_b,
            'winner': winner,
            'improvement': abs(metrics_a.get('match_rate', 0) - metrics_b.get('match_rate', 0)),
            'sample_size_a': len(variant_a_results),
            'sample_size_b': len(variant_b_results)
        }

        logger.info(f"A/B test analysis complete: {winner} wins")
        return analysis


# ============================================================================
# CONTINUOUS EVALUATION MONITOR
# ============================================================================

class ContinuousEvaluationMonitor:
    """Monitor agent performance continuously"""

    def __init__(self, threshold_config: Dict[str, float]):
        self.thresholds = threshold_config
        self.alerts = []
        logger.info("Initialized Continuous Evaluation Monitor")

    def check_performance(self, evaluation: Dict[str, Any]) -> List[Dict[str, str]]:
        """Check if performance meets thresholds"""
        alerts = []
        metrics = evaluation.get('metrics', {})

        # Check match rate
        if metrics.get('match_rate', 100) < self.thresholds.get('min_match_rate', 80):
            alerts.append({
                'severity': 'high',
                'metric': 'match_rate',
                'value': metrics.get('match_rate'),
                'threshold': self.thresholds.get('min_match_rate'),
                'message': f"Match rate {metrics.get('match_rate'):.2f}% below threshold"
            })

        # Check precision
        if metrics.get('precision', 1) < self.thresholds.get('min_precision', 0.85):
            alerts.append({
                'severity': 'medium',
                'metric': 'precision',
                'value': metrics.get('precision'),
                'threshold': self.thresholds.get('min_precision'),
                'message': f"Precision {metrics.get('precision'):.2f} below threshold"
            })

        # Check anomaly detection
        anomaly_rate = metrics.get('anomalies_detected', 0) / metrics.get('total_transactions', 1)
        if anomaly_rate > self.thresholds.get('max_anomaly_rate', 0.1):
            alerts.append({
                'severity': 'high',
                'metric': 'anomaly_rate',
                'value': anomaly_rate,
                'threshold': self.thresholds.get('max_anomaly_rate'),
                'message': f"Anomaly rate {anomaly_rate:.2%} exceeds threshold"
            })

        if alerts:
            self.alerts.extend(alerts)
            logger.warning(f"Performance alerts detected: {len(alerts)}")

        return alerts

In [None]:
# a2a_protocol.py
"""Agent-to-Agent (A2A) Protocol implementation"""

from typing import Dict, List, Any, Optional
from pydantic import BaseModel, Field
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
import httpx
import asyncio
from loguru import logger
import uuid
from datetime import datetime

# ============================================================================
# A2A MESSAGE SCHEMAS
# ============================================================================

class A2AMessage(BaseModel):
    """Standard A2A protocol message"""
    message_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    sender_agent: str
    receiver_agent: str
    message_type: str  # request, response, notification, error
    timestamp: datetime = Field(default_factory=datetime.now)
    payload: Dict[str, Any]
    correlation_id: Optional[str] = None
    metadata: Optional[Dict[str, Any]] = {}

class A2ARequest(BaseModel):
    """A2A request message"""
    action: str
    parameters: Dict[str, Any]
    callback_url: Optional[str] = None
    timeout: int = 300

class A2AResponse(BaseModel):
    """A2A response message"""
    status: str  # success, error, pending
    result: Any
    error_message: Optional[str] = None

# ============================================================================
# A2A AGENT REGISTRY
# ============================================================================

class AgentRegistry:
    """Registry for discovering and communicating with agents"""

    def __init__(self):
        self.agents: Dict[str, Dict[str, Any]] = {}
        logger.info("Initialized Agent Registry")

    def register_agent(self, agent_name: str, endpoint: str, capabilities: List[str]):
        """Register an agent in the system"""
        self.agents[agent_name] = {
            'endpoint': endpoint,
            'capabilities': capabilities,
            'status': 'active',
            'registered_at': datetime.now().isoformat()
        }
        logger.info(f"Registered agent: {agent_name} at {endpoint}")

    def unregister_agent(self, agent_name: str):
        """Unregister an agent"""
        if agent_name in self.agents:
            del self.agents[agent_name]
            logger.info(f"Unregistered agent: {agent_name}")

    def get_agent(self, agent_name: str) -> Optional[Dict[str, Any]]:
        """Get agent information"""
        return self.agents.get(agent_name)

    def find_agents_by_capability(self, capability: str) -> List[str]:
        """Find agents with specific capability"""
        return [
            name for name, info in self.agents.items()
            if capability in info.get('capabilities', [])
        ]

    def list_all_agents(self) -> Dict[str, Dict[str, Any]]:
        """List all registered agents"""
        return self.agents

# ============================================================================
# A2A COMMUNICATION CLIENT
# ============================================================================

class A2AClient:
    """Client for agent-to-agent communication"""

    def __init__(self, registry: AgentRegistry):
        self.registry = registry
        self.message_queue: List[A2AMessage] = []
        logger.info("Initialized A2A Client")

    async def send_message(self, message: A2AMessage) -> A2AResponse:
        """Send message to another agent"""
        target_agent = self.registry.get_agent(message.receiver_agent)

        if not target_agent:
            logger.error(f"Agent not found: {message.receiver_agent}")
            return A2AResponse(
                status="error",
                result=None,
                error_message=f"Agent {message.receiver_agent} not found"
            )

        try:
            async with httpx.AsyncClient(timeout=30.0) as client:
                response = await client.post(
                    f"{target_agent['endpoint']}/a2a/receive",
                    json=message.dict()
                )
                response.raise_for_status()
                logger.info(f"Message sent to {message.receiver_agent}")
                return A2AResponse(**response.json())

        except Exception as e:
            logger.error(f"Error sending message: {e}")
            return A2AResponse(
                status="error",
                result=None,
                error_message=str(e)
            )

    async def broadcast_message(self, sender: str, message_type: str, payload: Dict[str, Any]) -> List[A2AResponse]:
        """Broadcast message to all agents"""
        responses = []
        for agent_name in self.registry.list_all_agents().keys():
            if agent_name != sender:
                message = A2AMessage(
                    sender_agent=sender,
                    receiver_agent=agent_name,
                    message_type=message_type,
                    payload=payload
                )
                response = await self.send_message(message)
                responses.append(response)

        logger.info(f"Broadcast complete: {len(responses)} agents notified")
        return responses

    async def request_service(self, sender: str, receiver: str, action: str, parameters: Dict[str, Any]) -> A2AResponse:
        """Request a service from another agent"""
        request = A2ARequest(action=action, parameters=parameters)
        message = A2AMessage(
            sender_agent=sender,
            receiver_agent=receiver,
            message_type="request",
            payload=request.dict()
        )
        return await self.send_message(message)

# ============================================================================
# A2A PROTOCOL API
# ============================================================================

app = FastAPI(title="Financial Reconciliation A2A API")

# Global instances
agent_registry = AgentRegistry()
a2a_client = A2AClient(agent_registry)

@app.post("/a2a/register")
async def register_agent(agent_name: str, endpoint: str, capabilities: List[str]):
    """Register an agent in the system"""
    agent_registry.register_agent(agent_name, endpoint, capabilities)
    return {"status": "success", "message": f"Agent {agent_name} registered"}

@app.delete("/a2a/register/{agent_name}")
async def unregister_agent(agent_name: str):
    """Unregister an agent"""
    agent_registry.unregister_agent(agent_name)
    return {"status": "success", "message": f"Agent {agent_name} unregistered"}

@app.get("/a2a/agents")
async def list_agents():
    """List all registered agents"""
    return agent_registry.list_all_agents()

@app.get("/a2a/agents/capability/{capability}")
async def find_agents_by_capability(capability: str):
    """Find agents by capability"""
    agents = agent_registry.find_agents_by_capability(capability)
    return {"capability": capability, "agents": agents}

@app.post("/a2a/send")
async def send_message(message: A2AMessage):
    """Send message to another agent"""
    response = await a2a_client.send_message(message)
    return response

@app.post("/a2a/broadcast")
async def broadcast_message(sender: str, message_type: str, payload: Dict[str, Any]):
    """Broadcast message to all agents"""
    responses = await a2a_client.broadcast_message(sender, message_type, payload)
    return {"status": "success", "responses": len(responses), "details": responses}

@app.post("/a2a/request")
async def request_service(sender: str, receiver: str, action: str, parameters: Dict[str, Any]):
    """Request a service from another agent"""
    response = await a2a_client.request_service(sender, receiver, action, parameters)
    return response

@app.post("/a2a/receive")
async def receive_message(message: A2AMessage):
    """Receive message from another agent (endpoint for agents)"""
    logger.info(f"Received message from {message.sender_agent}")

    # Process message based on type
    if message.message_type == "request":
        # Handle request
        return A2AResponse(
            status="success",
            result={"message": "Request received and processing"}
        )
    elif message.message_type == "notification":
        # Handle notification
        return A2AResponse(
            status="success",
            result={"message": "Notification received"}
        )
    else:
        return A2AResponse(
            status="error",
            result=None,
            error_message="Unknown message type"
        )

# ============================================================================
# RECONCILIATION-SPECIFIC A2A ENDPOINTS
# ============================================================================

@app.post("/a2a/reconciliation/initiate")
async def initiate_reconciliation(
    session_id: str,
    source_data: List[Dict[str, Any]],
    target_data: List[Dict[str, Any]],
    background_tasks: BackgroundTasks
):
    """Initiate reconciliation process through A2A protocol"""

    # Send to data extraction agent
    message = A2AMessage(
        sender_agent="api_gateway",
        receiver_agent="data_extraction_agent",
        message_type="request",
        payload={
            "action": "extract_and_normalize",
            "session_id": session_id,
            "source_data": source_data,
            "target_data": target_data
        }
    )

    response = await a2a_client.send_message(message)

    return {
        "session_id": session_id,
        "status": "initiated",
        "message": "Reconciliation process started",
        "response": response
    }

@app.get("/a2a/reconciliation/status/{session_id}")
async def get_reconciliation_status(session_id: str):
    """Get reconciliation status through A2A protocol"""

    # Query supervisor agent
    response = await a2a_client.request_service(
        sender="api_gateway",
        receiver="supervisor",
        action="get_status",
        parameters={"session_id": session_id}
    )

    return response

@app.post("/a2a/reconciliation/pause/{session_id}")
async def pause_reconciliation(session_id: str):
    """Pause reconciliation process"""

    response = await a2a_client.request_service(
        sender="api_gateway",
        receiver="supervisor",
        action="pause",
        parameters={"session_id": session_id}
    )

    return {"session_id": session_id, "status": "paused", "response": response}

@app.post("/a2a/reconciliation/resume/{session_id}")
async def resume_reconciliation(session_id: str):
    """Resume paused reconciliation process"""

    response = await a2a_client.request_service(
        sender="api_gateway",
        receiver="supervisor",
        action="resume",
        parameters={"session_id": session_id}
    )

    return {"session_id": session_id, "status": "resumed", "response": response}

@app.get("/a2a/reconciliation/report/{session_id}")
async def get_reconciliation_report(session_id: str):
    """Get reconciliation report"""

    response = await a2a_client.request_service(
        sender="api_gateway",
        receiver="resolution_agent",
        action="get_report",
        parameters={"session_id": session_id}
    )

    return response

# ============================================================================
# HEALTH CHECK
# ============================================================================

@app.get("/health")
async def health_check():
    """Health check endpoint"""
    return {
        "status": "healthy",
        "registered_agents": len(agent_registry.list_all_agents()),
        "timestamp": datetime.now().isoformat()
    }

# ============================================================================
# WEBSOCKET SUPPORT FOR REAL-TIME UPDATES
# ============================================================================

from fastapi import WebSocket, WebSocketDisconnect
from typing import Set

class ConnectionManager:
    """Manage WebSocket connections for real-time updates"""

    def __init__(self):
        self.active_connections: Set[WebSocket] = set()

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.add(websocket)
        logger.info("WebSocket client connected")

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)
        logger.info("WebSocket client disconnected")

    async def broadcast(self, message: Dict[str, Any]):
        """Broadcast message to all connected clients"""
        for connection in self.active_connections:
            try:
                await connection.send_json(message)
            except Exception as e:
                logger.error(f"Error broadcasting to client: {e}")

manager = ConnectionManager()

@app.websocket("/ws/reconciliation/{session_id}")
async def websocket_endpoint(websocket: WebSocket, session_id: str):
    """WebSocket endpoint for real-time reconciliation updates"""
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            # Echo or process data
            await websocket.send_json({
                "session_id": session_id,
                "message": "Update received",
                "timestamp": datetime.now().isoformat()
            })
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        logger.info(f"Client disconnected from session {session_id}")

In [None]:
# deployment.yaml - Google Cloud Run Deployment Configuration

# Dockerfile
---
# Save as: Dockerfile
FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Expose ports
EXPOSE 8080 8000

# Run application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8080"]

---
# cloudbuild.yaml - Cloud Build Configuration
steps:
  # Build the container image
  - name: 'gcr.io/cloud-builders/docker'
    args: ['build', '-t', 'gcr.io/$PROJECT_ID/financial-reconciliation:$COMMIT_SHA', '.']

  # Push the container image to Container Registry
  - name: 'gcr.io/cloud-builders/docker'
    args: ['push', 'gcr.io/$PROJECT_ID/financial-reconciliation:$COMMIT_SHA']

  # Deploy to Cloud Run
  - name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
    entrypoint: gcloud
    args:
      - 'run'
      - 'deploy'
      - 'financial-reconciliation'
      - '--image'
      - 'gcr.io/$PROJECT_ID/financial-reconciliation:$COMMIT_SHA'
      - '--region'
      - 'us-central1'
      - '--platform'
      - 'managed'
      - '--allow-unauthenticated'
      - '--memory'
      - '4Gi'
      - '--cpu'
      - '2'
      - '--timeout'
      - '900'
      - '--concurrency'
      - '10'
      - '--set-env-vars'
      - 'GOOGLE_CLOUD_PROJECT=$PROJECT_ID,VERTEX_AI_LOCATION=us-central1'

images:
  - 'gcr.io/$PROJECT_ID/financial-reconciliation:$COMMIT_SHA'

---
# cloud-run-service.yaml - Cloud Run Service Configuration
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: financial-reconciliation
  namespace: default
  labels:
    cloud.googleapis.com/location: us-central1
spec:
  template:
    metadata:
      annotations:
        autoscaling.knative.dev/maxScale: '10'
        autoscaling.knative.dev/minScale: '1'
        run.googleapis.com/cpu-throttling: 'false'
        run.googleapis.com/execution-environment: gen2
    spec:
      containerConcurrency: 10
      timeoutSeconds: 900
      serviceAccountName: reconciliation-sa@PROJECT_ID.iam.gserviceaccount.com
      containers:
      - image: gcr.io/PROJECT_ID/financial-reconciliation:latest
        ports:
        - name: http1
          containerPort: 8080
        env:
        - name: GOOGLE_CLOUD_PROJECT
          value: "PROJECT_ID"
        - name: VERTEX_AI_LOCATION
          value: "us-central1"
        - name: REDIS_HOST
          valueFrom:
            secretKeyRef:
              name: redis-config
              key: host
        - name: LANGSMITH_API_KEY
          valueFrom:
            secretKeyRef:
              name: langsmith-config
              key: api-key
        resources:
          limits:
            cpu: '2000m'
            memory: 4Gi
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 10
          periodSeconds: 10

---
# terraform/main.tf - Terraform Infrastructure as Code
terraform {
  required_providers {
    google = {
      source  = "hashicorp/google"
      version = "~> 5.0"
    }
  }
}

provider "google" {
  project = var.project_id
  region  = var.region
}

# Enable required APIs
resource "google_project_service" "required_apis" {
  for_each = toset([
    "run.googleapis.com",
    "cloudbuild.googleapis.com",
    "containerregistry.googleapis.com",
    "aiplatform.googleapis.com",
    "storage.googleapis.com",
    "bigquery.googleapis.com",
    "firestore.googleapis.com",
    "logging.googleapis.com",
    "cloudtrace.googleapis.com",
    "redis.googleapis.com"
  ])

  service = each.key
  disable_on_destroy = false
}

# Service Account
resource "google_service_account" "reconciliation_sa" {
  account_id   = "reconciliation-sa"
  display_name = "Financial Reconciliation Service Account"
}

# IAM Roles
resource "google_project_iam_member" "sa_roles" {
  for_each = toset([
    "roles/aiplatform.user",
    "roles/storage.objectAdmin",
    "roles/bigquery.dataEditor",
    "roles/datastore.user",
    "roles/logging.logWriter",
    "roles/cloudtrace.agent"
  ])

  project = var.project_id
  role    = each.key
  member  = "serviceAccount:${google_service_account.reconciliation_sa.email}"
}

# Cloud Storage Bucket
resource "google_storage_bucket" "reconciliation_data" {
  name          = "${var.project_id}-reconciliation-data"
  location      = var.region
  force_destroy = false

  uniform_bucket_level_access = true

  versioning {
    enabled = true
  }

  lifecycle_rule {
    condition {
      age = 90
    }
    action {
      type = "Delete"
    }
  }
}

# BigQuery Dataset
resource "google_bigquery_dataset" "reconciliation" {
  dataset_id                  = "reconciliation"
  friendly_name               = "Financial Reconciliation Data"
  description                 = "Dataset for reconciliation transactions and audit logs"
  location                    = var.region
  default_table_expiration_ms = 7776000000  # 90 days
}

# BigQuery Tables
resource "google_bigquery_table" "transactions" {
  dataset_id = google_bigquery_dataset.reconciliation.dataset_id
  table_id   = "transactions"

  schema = jsonencode([
    {
      name = "transaction_id"
      type = "STRING"
      mode = "REQUIRED"
    },
    {
      name = "date"
      type = "DATE"
      mode = "REQUIRED"
    },
    {
      name = "amount"
      type = "FLOAT64"
      mode = "REQUIRED"
    },
    {
      name = "description"
      type = "STRING"
      mode = "NULLABLE"
    },
    {
      name = "source"
      type = "STRING"
      mode = "REQUIRED"
    },
    {
      name = "status"
      type = "STRING"
      mode = "NULLABLE"
    }
  ])
}

# Firestore Database
resource "google_firestore_database" "reconciliation" {
  name        = "(default)"
  location_id = var.region
  type        = "FIRESTORE_NATIVE"
}

# Redis Instance (Memorystore)
resource "google_redis_instance" "memory_bank" {
  name           = "reconciliation-memory-bank"
  tier           = "STANDARD_HA"
  memory_size_gb = 5
  region         = var.region

  redis_version = "REDIS_7_0"

  auth_enabled = true
}

# Cloud Run Service
resource "google_cloud_run_service" "reconciliation" {
  name     = "financial-reconciliation"
  location = var.region

  template {
    spec {
      service_account_name = google_service_account.reconciliation_sa.email

      containers {
        image = "gcr.io/${var.project_id}/financial-reconciliation:latest"

        ports {
          container_port = 8080
        }

        env {
          name  = "GOOGLE_CLOUD_PROJECT"
          value = var.project_id
        }

        env {
          name  = "REDIS_HOST"
          value = google_redis_instance.memory_bank.host
        }

        resources {
          limits = {
            cpu    = "2000m"
            memory = "4Gi"
          }
        }
      }
    }

    metadata {
      annotations = {
        "autoscaling.knative.dev/maxScale" = "10"
        "autoscaling.knative.dev/minScale" = "1"
      }
    }
  }

  traffic {
    percent         = 100
    latest_revision = true
  }
}

# Cloud Run IAM
resource "google_cloud_run_service_iam_member" "public_access" {
  service  = google_cloud_run_service.reconciliation.name
  location = google_cloud_run_service.reconciliation.location
  role     = "roles/run.invoker"
  member   = "allUsers"
}

# Variables
variable "project_id" {
  description = "GCP Project ID"
  type        = string
}

variable "region" {
  description = "GCP Region"
  type        = string
  default     = "us-central1"
}

# Outputs
output "cloud_run_url" {
  value = google_cloud_run_service.reconciliation.status[0].url
}

output "redis_host" {
  value = google_redis_instance.memory_bank.host
}

output "bucket_name" {
  value = google_storage_bucket.reconciliation_data.name
}

---
# kubernetes/deployment.yaml - Alternative Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: financial-reconciliation
  namespace: default
spec:
  replicas: 3
  selector:
    matchLabels:
      app: financial-reconciliation
  template:
    metadata:
      labels:
        app: financial-reconciliation
    spec:
      serviceAccountName: reconciliation-sa
      containers:
      - name: reconciliation-agent
        image: gcr.io/PROJECT_ID/financial-reconciliation:latest
        ports:
        - containerPort: 8080
          name: http
        - containerPort: 8000
          name: metrics
        env:
        - name: GOOGLE_CLOUD_PROJECT
          value: "PROJECT_ID"
        - name: REDIS_HOST
          valueFrom:
            configMapKeyRef:
              name: redis-config
              key: host
        resources:
          requests:
            memory: "2Gi"
            cpu: "1000m"
          limits:
            memory: "4Gi"
            cpu: "2000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 10
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: financial-reconciliation
  namespace: default
spec:
  type: LoadBalancer
  selector:
    app: financial-reconciliation
  ports:
  - name: http
    port: 80
    targetPort: 8080
  - name: metrics
    port: 8000
    targetPort: 8000

---
# scripts/deploy.sh - Deployment Script
#!/bin/bash

set -e

PROJECT_ID="your-project-id"
REGION="us-central1"
SERVICE_NAME="financial-reconciliation"

echo "üöÄ Deploying Financial Reconciliation Agent System"

# Set project
gcloud config set project $PROJECT_ID

# Enable APIs
echo "üì° Enabling required APIs..."
gcloud services enable \
  run.googleapis.com \
  cloudbuild.googleapis.com \
  aiplatform.googleapis.com \
  storage.googleapis.com \
  bigquery.googleapis.com \
  firestore.googleapis.com

# Build and deploy
echo "üèóÔ∏è  Building container..."
gcloud builds submit --tag gcr.io/$PROJECT_ID/$SERVICE_NAME

echo "üö¢ Deploying to Cloud Run..."
gcloud run deploy $SERVICE_NAME \
  --image gcr.io/$PROJECT_ID/$SERVICE_NAME \
  --region $REGION \
  --platform managed \
  --allow-unauthenticated \
  --memory 4Gi \
  --cpu 2 \
  --timeout 900 \
  --concurrency 10

# Get service URL
SERVICE_URL=$(gcloud run services describe $SERVICE_NAME --region $REGION --format 'value(status.url)')

echo "‚úÖ Deployment complete!"
echo "üåê Service URL: $SERVICE_URL"
echo "üìä Metrics: $SERVICE_URL/metrics"
echo "üíì Health: $SERVICE_URL/health"

In [None]:
# main.py
"""Main application orchestrating the financial reconciliation agent system"""

import asyncio
import uuid
from datetime import datetime
from typing import Dict, Any, List
from fastapi import FastAPI, BackgroundTasks, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from loguru import logger

# Import all modules
from config import config
from memory_management import (
    InMemorySessionService,
    MemoryBank,
    FirestoreStateManager,
    ContextCompactor
)
from tools import create_langchain_tools
from agents import build_reconciliation_workflow, AgentState
from observability import ObservabilityManager, DashboardExporter
from evaluation import AgentEvaluator, ContinuousEvaluationMonitor
from a2a_protocol import app as a2a_app, agent_registry, a2a_client
from google.cloud import bigquery

# ============================================================================
# APPLICATION SETUP
# ============================================================================

app = FastAPI(
    title="Financial Reconciliation Multi-Agent System",
    description="Complete AI-powered financial reconciliation pipeline",
    version="1.0.0"
)

# CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Mount A2A Protocol API
app.mount("/a2a", a2a_app)

# ============================================================================
# GLOBAL INSTANCES
# ============================================================================

# Initialize core services
session_service = InMemorySessionService()
memory_bank = MemoryBank(config.redis)
firestore_manager = FirestoreStateManager(config.gcp.project_id)
context_compactor = ContextCompactor(max_tokens=100000)
observability = ObservabilityManager(config)
evaluator = AgentEvaluator()
monitor = ContinuousEvaluationMonitor(threshold_config={
    'min_match_rate': 80.0,
    'min_precision': 0.85,
    'max_anomaly_rate': 0.1
})

# Create tools
tools = create_langchain_tools(config)

# Build workflow
reconciliation_workflow = build_reconciliation_workflow(config, tools)

logger.info("üöÄ Financial Reconciliation System Initialized")

# ============================================================================
# REQUEST/RESPONSE MODELS
# ============================================================================

class ReconciliationRequest(BaseModel):
    """Request to start reconciliation"""
    source_type: str  # bank, erp, invoice
    target_type: str
    source_data: List[Dict[str, Any]]
    target_data: List[Dict[str, Any]]
    options: Dict[str, Any] = {}

class ReconciliationResponse(BaseModel):
    """Response from reconciliation"""
    session_id: str
    status: str
    message: str
    estimated_time: int = 300

class ReconciliationStatus(BaseModel):
    """Status of ongoing reconciliation"""
    session_id: str
    status: str
    current_agent: str
    progress: float
    metrics: Dict[str, Any]

# ============================================================================
# MAIN API ENDPOINTS
# ============================================================================

@app.post("/reconcile", response_model=ReconciliationResponse)
async def start_reconciliation(
    request: ReconciliationRequest,
    background_tasks: BackgroundTasks
):
    """Start a new reconciliation process"""

    # Generate session ID
    session_id = str(uuid.uuid4())

    # Create session
    initial_context = {
        "source_type": request.source_type,
        "target_type": request.target_type,
        "source_count": len(request.source_data),
        "target_count": len(request.target_data),
        "options": request.options
    }

    session = session_service.create_session(session_id, initial_context)

    # Store in Firestore for persistence
    firestore_manager.save_session(session)

    # Log event
    observability.logger.log_reconciliation_event(
        session_id=session_id,
        event="reconciliation_initiated",
        metrics=initial_context
    )

    # Start reconciliation in background
    background_tasks.add_task(
        execute_reconciliation,
        session_id,
        request.source_data,
        request.target_data
    )

    logger.info(f"‚úÖ Reconciliation initiated: {session_id}")

    return ReconciliationResponse(
        session_id=session_id,
        status="initiated",
        message="Reconciliation process started",
        estimated_time=300
    )


@app.get("/reconcile/{session_id}/status", response_model=ReconciliationStatus)
async def get_reconciliation_status(session_id: str):
    """Get the status of a reconciliation session"""

    # Get from session service
    session = session_service.get_session(session_id)

    if not session:
        # Try loading from Firestore
        session = firestore_manager.load_session(session_id)

    if not session:
        raise HTTPException(status_code=404, detail="Session not found")

    # Calculate progress
    context = session.context
    total_steps = 5  # data_extraction, matching, anomaly, resolution, audit
    current_step_map = {
        "data_extraction_agent": 1,
        "matching_agent": 2,
        "anomaly_detection_agent": 3,
        "resolution_agent": 4,
        "audit_agent": 5,
        "complete": 5
    }

    current_step = current_step_map.get(context.get("current_agent", ""), 0)
    progress = (current_step / total_steps) * 100

    return ReconciliationStatus(
        session_id=session_id,
        status=session.status,
        current_agent=context.get("current_agent", "unknown"),
        progress=progress,
        metrics={
            "total_transactions": context.get("source_count", 0),
            "matched": len(context.get("matched_transactions", [])),
            "anomalies": len(context.get("anomalies", []))
        }
    )


@app.get("/reconcile/{session_id}/report")
async def get_reconciliation_report(session_id: str):
    """Get the complete reconciliation report"""

    session = session_service.get_session(session_id)

    if not session:
        session = firestore_manager.load_session(session_id)

    if not session:
        raise HTTPException(status_code=404, detail="Session not found")

    if session.status != "completed":
        raise HTTPException(status_code=400, detail="Reconciliation not complete")

    report = session.context.get("reconciliation_report", {})

    return {
        "session_id": session_id,
        "completed_at": session.updated_at,
        "report": report
    }


@app.post("/reconcile/{session_id}/pause")
async def pause_reconciliation(session_id: str):
    """Pause an ongoing reconciliation"""

    session = session_service.get_session(session_id)

    if not session:
        raise HTTPException(status_code=404, detail="Session not found")

    # Create checkpoint
    checkpoint_data = {
        "context": session.context,
        "timestamp": datetime.now().isoformat()
    }

    session_service.checkpoint(session_id, checkpoint_data)
    firestore_manager.save_checkpoint(session_id, checkpoint_data)

    session_service.update_session(session_id, {}, status="paused")

    logger.info(f"‚è∏Ô∏è  Reconciliation paused: {session_id}")

    return {"session_id": session_id, "status": "paused", "message": "Reconciliation paused successfully"}


@app.post("/reconcile/{session_id}/resume")
async def resume_reconciliation(session_id: str, background_tasks: BackgroundTasks):
    """Resume a paused reconciliation"""

    # Get latest checkpoint
    checkpoint = firestore_manager.get_latest_checkpoint(session_id)

    if not checkpoint:
        raise HTTPException(status_code=404, detail="No checkpoint found")

    # Restore context
    context = checkpoint.get("data", {}).get("context", {})
    session_service.update_session(session_id, context, status="active")

    # Resume execution
    background_tasks.add_task(
        resume_reconciliation_execution,
        session_id,
        checkpoint
    )

    logger.info(f"‚ñ∂Ô∏è  Reconciliation resumed: {session_id}")

    return {"session_id": session_id, "status": "resumed", "message": "Reconciliation resumed successfully"}


@app.delete("/reconcile/{session_id}")
async def cancel_reconciliation(session_id: str):
    """Cancel a reconciliation session"""

    session_service.update_session(session_id, {}, status="cancelled")
    session_service.delete_session(session_id)

    logger.info(f"‚ùå Reconciliation cancelled: {session_id}")

    return {"session_id": session_id, "status": "cancelled", "message": "Reconciliation cancelled"}


# ============================================================================
# EVALUATION ENDPOINTS
# ============================================================================

@app.get("/evaluation/{session_id}")
async def get_evaluation(session_id: str):
    """Get evaluation metrics for a reconciliation session"""

    session = session_service.get_session(session_id) or firestore_manager.load_session(session_id)

    if not session:
        raise HTTPException(status_code=404, detail="Session not found")

    # Evaluate
    evaluation = evaluator.evaluate_reconciliation(
        session_id=session_id,
        agent_results=session.context
    )

    # Check performance
    alerts = monitor.check_performance(evaluation)

    return {
        "session_id": session_id,
        "evaluation": evaluation,
        "alerts": alerts
    }


@app.get("/evaluation/report")
async def get_evaluation_report():
    """Get comprehensive evaluation report"""

    report = evaluator.generate_evaluation_report()
    return {"report": report}


# ============================================================================
# BACKGROUND EXECUTION FUNCTIONS
# ============================================================================

async def execute_reconciliation(
    session_id: str,
    source_data: List[Dict[str, Any]],
    target_data: List[Dict[str, Any]]
):
    """Execute the reconciliation workflow"""

    try:
        # Update metrics
        observability.metrics.update_active_sessions(len(session_service._sessions))

        # Initialize state
        initial_state: AgentState = {
            "messages": [],
            "session_id": session_id,
            "current_agent": "data_extraction_agent",
            "transactions_source": source_data,
            "transactions_target": target_data,
            "matched_transactions": [],
            "unmatched_transactions": [],
            "anomalies": [],
            "reconciliation_report": {},
            "next_action": "",
            "iteration_count": 0,
            "status": "active"
        }

        # Configure for checkpointing
        config_dict = {
            "configurable": {
                "thread_id": session_id
            }
        }

        # Execute workflow
        logger.info(f"üîÑ Executing reconciliation workflow for {session_id}")

        final_state = None
        async for state in reconciliation_workflow.astream(initial_state, config_dict):
            # Update session with current state
            if state:
                final_state = state
                for agent_name, agent_state in state.items():
                    session_service.update_session(
                        session_id,
                        {"current_agent": agent_name, **agent_state}
                    )

                # Create checkpoint every 5 steps
                if agent_state.get("iteration_count", 0) % 5 == 0:
                    checkpoint_data = {"state": agent_state}
                    firestore_manager.save_checkpoint(session_id, checkpoint_data)

        # Mark as complete
        session_service.update_session(session_id, {}, status="completed")

        # Record metrics
        if final_state:
            last_agent_state = list(final_state.values())[-1]
            metrics = {
                "total": len(source_data),
                "matched": len(last_agent_state.get("matched_transactions", [])),
                "unmatched": len(last_agent_state.get("unmatched_transactions", [])),
                "anomalies": last_agent_state.get("anomalies", []),
                "anomalies_count": len(last_agent_state.get("anomalies", [])),
                "match_rate": (len(last_agent_state.get("matched_transactions", [])) / len(source_data) * 100) if source_data else 0,
                "duration": 0  # Would track actual time
            }

            observability.log_reconciliation_metrics(session_id, metrics)
            observability.metrics.record_reconciliation("success")

        logger.info(f"‚úÖ Reconciliation completed: {session_id}")

    except Exception as e:
        logger.error(f"‚ùå Reconciliation failed for {session_id}: {e}")
        session_service.update_session(session_id, {}, status="failed")
        observability.metrics.record_reconciliation("failed")
        raise


async def resume_reconciliation_execution(session_id: str, checkpoint: Dict[str, Any]):
    """Resume reconciliation from checkpoint"""

    try:
        # Restore state from checkpoint
        checkpoint_state = checkpoint.get("data", {}).get("state", {})

        # Continue workflow from checkpoint
        config_dict = {
            "configurable": {
                "thread_id": session_id
            }
        }

        async for state in reconciliation_workflow.astream(checkpoint_state, config_dict):
            if state:
                for agent_name, agent_state in state.items():
                    session_service.update_session(
                        session_id,
                        {"current_agent": agent_name, **agent_state}
                    )

        session_service.update_session(session_id, {}, status="completed")
        logger.info(f"‚úÖ Reconciliation resumed and completed: {session_id}")

    except Exception as e:
        logger.error(f"‚ùå Resume failed for {session_id}: {e}")
        session_service.update_session(session_id, {}, status="failed")


# ============================================================================
# SYSTEM ENDPOINTS
# ============================================================================

@app.get("/health")
async def health_check():
    """Health check endpoint"""
    return {
        "status": "healthy",
        "timestamp": datetime.now().isoformat(),
        "active_sessions": len(session_service._sessions),
        "version": "1.0.0"
    }


@app.get("/metrics")
async def get_metrics():
    """Get system metrics"""
    return {
        "active_sessions": len(session_service._sessions),
        "total_reconciliations": evaluator.evaluation_history,
        "timestamp": datetime.now().isoformat()
    }


@app.get("/")
async def root():
    """Root endpoint"""
    return {
        "service": "Financial Reconciliation Multi-Agent System",
        "version": "1.0.0",
        "endpoints": {
            "reconcile": "/reconcile",
            "status": "/reconcile/{session_id}/status",
            "report": "/reconcile/{session_id}/report",
            "a2a": "/a2a",
            "health": "/health",
            "docs": "/docs"
        }
    }


# ============================================================================
# STARTUP/SHUTDOWN EVENTS
# ============================================================================

@app.on_event("startup")
async def startup_event():
    """Startup tasks"""
    logger.info("üöÄ Starting Financial Reconciliation System")

    # Register agents with A2A protocol
    agents_to_register = [
        ("data_extraction_agent", "http://localhost:8080", ["data_extraction", "normalization"]),
        ("matching_agent", "http://localhost:8080", ["fuzzy_matching", "transaction_pairing"]),
        ("anomaly_detection_agent", "http://localhost:8080", ["anomaly_detection", "fraud_detection"]),
        ("resolution_agent", "http://localhost:8080", ["resolution", "reporting"]),
        ("audit_agent", "http://localhost:8080", ["audit", "compliance"]),
        ("supervisor", "http://localhost:8080", ["orchestration", "routing"])
    ]

    for agent_name, endpoint, capabilities in agents_to_register:
        agent_registry.register_agent(agent_name, endpoint, capabilities)

    logger.info("‚úÖ All agents registered")


@app.on_event("shutdown")
async def shutdown_event():
    """Cleanup tasks"""
    logger.info("üõë Shutting down Financial Reconciliation System")


# ============================================================================
# RUN APPLICATION
# ============================================================================

if __name__ == "__main__":
    import uvicorn

    uvicorn.run(
        "main:app",
        host="0.0.0.0",
        port=8080,
        reload=True,
        log_level="info"
    )

In [None]:
# main.py
"""Main application orchestrating the financial reconciliation agent system"""

import asyncio
import uuid
from datetime import datetime
from typing import Dict, Any, List
from fastapi import FastAPI, BackgroundTasks, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from loguru import logger

# Import all modules
from config import config
from memory_management import (
    InMemorySessionService,
    MemoryBank,
    FirestoreStateManager,
    ContextCompactor
)
from tools import create_langchain_tools
from agents import build_reconciliation_workflow, AgentState
from observability import ObservabilityManager, DashboardExporter
from evaluation import AgentEvaluator, ContinuousEvaluationMonitor
from a2a_protocol import app as a2a_app, agent_registry, a2a_client
from google.cloud import bigquery

# ============================================================================
# APPLICATION SETUP
# ============================================================================

app = FastAPI(
    title="Financial Reconciliation Multi-Agent System",
    description="Complete AI-powered financial reconciliation pipeline",
    version="1.0.0"
)

# CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Mount A2A Protocol API
app.mount("/a2a", a2a_app)

# ============================================================================
# GLOBAL INSTANCES
# ============================================================================

# Initialize core services
session_service = InMemorySessionService()
memory_bank = MemoryBank(config.redis)
firestore_manager = FirestoreStateManager(config.gcp.project_id)
context_compactor = ContextCompactor(max_tokens=100000)
observability = ObservabilityManager(config)
evaluator = AgentEvaluator()
monitor = ContinuousEvaluationMonitor(threshold_config={
    'min_match_rate': 80.0,
    'min_precision': 0.85,
    'max_anomaly_rate': 0.1
})

# Create tools
tools = create_langchain_tools(config)

# Build workflow
reconciliation_workflow = build_reconciliation_workflow(config, tools)

logger.info("üöÄ Financial Reconciliation System Initialized")

# ============================================================================
# REQUEST/RESPONSE MODELS
# ============================================================================

class ReconciliationRequest(BaseModel):
    """Request to start reconciliation"""
    source_type: str  # bank, erp, invoice
    target_type: str
    source_data: List[Dict[str, Any]]
    target_data: List[Dict[str, Any]]
    options: Dict[str, Any] = {}

class ReconciliationResponse(BaseModel):
    """Response from reconciliation"""
    session_id: str
    status: str
    message: str
    estimated_time: int = 300

class ReconciliationStatus(BaseModel):
    """Status of ongoing reconciliation"""
    session_id: str
    status: str
    current_agent: str
    progress: float
    metrics: Dict[str, Any]

# ============================================================================
# MAIN API ENDPOINTS
# ============================================================================

@app.post("/reconcile", response_model=ReconciliationResponse)
async def start_reconciliation(
    request: ReconciliationRequest,
    background_tasks: BackgroundTasks
):
    """Start a new reconciliation process"""

    # Generate session ID
    session_id = str(uuid.uuid4())

    # Create session
    initial_context = {
        "source_type": request.source_type,
        "target_type": request.target_type,
        "source_count": len(request.source_data),
        "target_count": len(request.target_data),
        "options": request.options
    }

    session = session_service.create_session(session_id, initial_context)

    # Store in Firestore for persistence
    firestore_manager.save_session(session)

    # Log event
    observability.logger.log_reconciliation_event(
        session_id=session_id,
        event="reconciliation_initiated",
        metrics=initial_context
    )

    # Start reconciliation in background
    background_tasks.add_task(
        execute_reconciliation,
        session_id,
        request.source_data,
        request.target_data
    )

    logger.info(f"‚úÖ Reconciliation initiated: {session_id}")

    return ReconciliationResponse(
        session_id=session_id,
        status="initiated",
        message="Reconciliation process started",
        estimated_time=300
    )


@app.get("/reconcile/{session_id}/status", response_model=ReconciliationStatus)
async def get_reconciliation_status(session_id: str):
    """Get the status of a reconciliation session"""

    # Get from session service
    session = session_service.get_session(session_id)

    if not session:
        # Try loading from Firestore
        session = firestore_manager.load_session(session_id)

    if not session:
        raise HTTPException(status_code=404, detail="Session not found")

    # Calculate progress
    context = session.context
    total_steps = 5  # data_extraction, matching, anomaly, resolution, audit
    current_step_map = {
        "data_extraction_agent": 1,
        "matching_agent": 2,
        "anomaly_detection_agent": 3,
        "resolution_agent": 4,
        "audit_agent": 5,
        "complete": 5
    }

    current_step = current_step_map.get(context.get("current_agent", ""), 0)
    progress = (current_step / total_steps) * 100

    return ReconciliationStatus(
        session_id=session_id,
        status=session.status,
        current_agent=context.get("current_agent", "unknown"),
        progress=progress,
        metrics={
            "total_transactions": context.get("source_count", 0),
            "matched": len(context.get("matched_transactions", [])),
            "anomalies": len(context.get("anomalies", []))
        }
    )


@app.get("/reconcile/{session_id}/report")
async def get_reconciliation_report(session_id: str):
    """Get the complete reconciliation report"""

    session = session_service.get_session(session_id)

    if not session:
        session = firestore_manager.load_session(session_id)

    if not session:
        raise HTTPException(status_code=404, detail="Session not found")

    if session.status != "completed":
        raise HTTPException(status_code=400, detail="Reconciliation not complete")

    report = session.context.get("reconciliation_report", {})

    return {
        "session_id": session_id,
        "completed_at": session.updated_at,
        "report": report
    }


@app.post("/reconcile/{session_id}/pause")
async def pause_reconciliation(session_id: str):
    """Pause an ongoing reconciliation"""

    session = session_service.get_session(session_id)

    if not session:
        raise HTTPException(status_code=404, detail="Session not found")

    # Create checkpoint
    checkpoint_data = {
        "context": session.context,
        "timestamp": datetime.now().isoformat()
    }

    session_service.checkpoint(session_id, checkpoint_data)
    firestore_manager.save_checkpoint(session_id, checkpoint_data)

    session_service.update_session(session_id, {}, status="paused")

    logger.info(f"‚è∏Ô∏è  Reconciliation paused: {session_id}")

    return {"session_id": session_id, "status": "paused", "message": "Reconciliation paused successfully"}


@app.post("/reconcile/{session_id}/resume")
async def resume_reconciliation(session_id: str, background_tasks: BackgroundTasks):
    """Resume a paused reconciliation"""

    # Get latest checkpoint
    checkpoint = firestore_manager.get_latest_checkpoint(session_id)

    if not checkpoint:
        raise HTTPException(status_code=404, detail="No checkpoint found")

    # Restore context
    context = checkpoint.get("data", {}).get("context", {})
    session_service.update_session(session_id, context, status="active")

    # Resume execution
    background_tasks.add_task(
        resume_reconciliation_execution,
        session_id,
        checkpoint
    )

    logger.info(f"‚ñ∂Ô∏è  Reconciliation resumed: {session_id}")

    return {"session_id": session_id, "status": "resumed", "message": "Reconciliation resumed successfully"}


@app.delete("/reconcile/{session_id}")
async def cancel_reconciliation(session_id: str):
    """Cancel a reconciliation session"""

    session_service.update_session(session_id, {}, status="cancelled")
    session_service.delete_session(session_id)

    logger.info(f"‚ùå Reconciliation cancelled: {session_id}")

    return {"session_id": session_id, "status": "cancelled", "message": "Reconciliation cancelled"}


# ============================================================================
# EVALUATION ENDPOINTS
# ============================================================================

@app.get("/evaluation/{session_id}")
async def get_evaluation(session_id: str):
    """Get evaluation metrics for a reconciliation session"""

    session = session_service.get_session(session_id) or firestore_manager.load_session(session_id)

    if not session:
        raise HTTPException(status_code=404, detail="Session not found")

    # Evaluate
    evaluation = evaluator.evaluate_reconciliation(
        session_id=session_id,
        agent_results=session.context
    )

    # Check performance
    alerts = monitor.check_performance(evaluation)

    return {
        "session_id": session_id,
        "evaluation": evaluation,
        "alerts": alerts
    }


@app.get("/evaluation/report")
async def get_evaluation_report():
    """Get comprehensive evaluation report"""

    report = evaluator.generate_evaluation_report()
    return {"report": report}


# ============================================================================
# BACKGROUND EXECUTION FUNCTIONS
# ============================================================================

async def execute_reconciliation(
    session_id: str,
    source_data: List[Dict[str, Any]],
    target_data: List[Dict[str, Any]]
):
    """Execute the reconciliation workflow"""

    try:
        # Update metrics
        observability.metrics.update_active_sessions(len(session_service._sessions))

        # Initialize state
        initial_state: AgentState = {
            "messages": [],
            "session_id": session_id,
            "current_agent": "data_extraction_agent",
            "transactions_source": source_data,
            "transactions_target": target_data,
            "matched_transactions": [],
            "unmatched_transactions": [],
            "anomalies": [],
            "reconciliation_report": {},
            "next_action": "",
            "iteration_count": 0,
            "status": "active"
        }

        # Configure for checkpointing
        config_dict = {
            "configurable": {
                "thread_id": session_id
            }
        }

        # Execute workflow
        logger.info(f"üîÑ Executing reconciliation workflow for {session_id}")

        final_state = None
        async for state in reconciliation_workflow.astream(initial_state, config_dict):
            # Update session with current state
            if state:
                final_state = state
                for agent_name, agent_state in state.items():
                    session_service.update_session(
                        session_id,
                        {"current_agent": agent_name, **agent_state}
                    )

                # Create checkpoint every 5 steps
                if agent_state.get("iteration_count", 0) % 5 == 0:
                    checkpoint_data = {"state": agent_state}
                    firestore_manager.save_checkpoint(session_id, checkpoint_data)

        # Mark as complete
        session_service.update_session(session_id, {}, status="completed")

        # Record metrics
        if final_state:
            last_agent_state = list(final_state.values())[-1]
            metrics = {
                "total": len(source_data),
                "matched": len(last_agent_state.get("matched_transactions", [])),
                "unmatched": len(last_agent_state.get("unmatched_transactions", [])),
                "anomalies": last_agent_state.get("anomalies", []),
                "anomalies_count": len(last_agent_state.get("anomalies", [])),
                "match_rate": (len(last_agent_state.get("matched_transactions", [])) / len(source_data) * 100) if source_data else 0,
                "duration": 0  # Would track actual time
            }

            observability.log_reconciliation_metrics(session_id, metrics)
            observability.metrics.record_reconciliation("success")

        logger.info(f"‚úÖ Reconciliation completed: {session_id}")

    except Exception as e:
        logger.error(f"‚ùå Reconciliation failed for {session_id}: {e}")
        session_service.update_session(session_id, {}, status="failed")
        observability.metrics.record_reconciliation("failed")
        raise


async def resume_reconciliation_execution(session_id: str, checkpoint: Dict[str, Any]):
    """Resume reconciliation from checkpoint"""

    try:
        # Restore state from checkpoint
        checkpoint_state = checkpoint.get("data", {}).get("state", {})

        # Continue workflow from checkpoint
        config_dict = {
            "configurable": {
                "thread_id": session_id
            }
        }

        async for state in reconciliation_workflow.astream(checkpoint_state, config_dict):
            if state:
                for agent_name, agent_state in state.items():
                    session_service.update_session(
                        session_id,
                        {"current_agent": agent_name, **agent_state}
                    )

        session_service.update_session(session_id, {}, status="completed")
        logger.info(f"‚úÖ Reconciliation resumed and completed: {session_id}")

    except Exception as e:
        logger.error(f"‚ùå Resume failed for {session_id}: {e}")
        session_service.update_session(session_id, {}, status="failed")


# ============================================================================
# SYSTEM ENDPOINTS
# ============================================================================

@app.get("/health")
async def health_check():
    """Health check endpoint"""
    return {
        "status": "healthy",
        "timestamp": datetime.now().isoformat(),
        "active_sessions": len(session_service._sessions),
        "version": "1.0.0"
    }


@app.get("/metrics")
async def get_metrics():
    """Get system metrics"""
    return {
        "active_sessions": len(session_service._sessions),
        "total_reconciliations": evaluator.evaluation_history,
        "timestamp": datetime.now().isoformat()
    }


@app.get("/")
async def root():
    """Root endpoint"""
    return {
        "service": "Financial Reconciliation Multi-Agent System",
        "version": "1.0.0",
        "endpoints": {
            "reconcile": "/reconcile",
            "status": "/reconcile/{session_id}/status",
            "report": "/reconcile/{session_id}/report",
            "a2a": "/a2a",
            "health": "/health",
            "docs": "/docs"
        }
    }


# ============================================================================
# STARTUP/SHUTDOWN EVENTS
# ============================================================================

@app.on_event("startup")
async def startup_event():
    """Startup tasks"""
    logger.info("üöÄ Starting Financial Reconciliation System")

    # Register agents with A2A protocol
    agents_to_register = [
        ("data_extraction_agent", "http://localhost:8080", ["data_extraction", "normalization"]),
        ("matching_agent", "http://localhost:8080", ["fuzzy_matching", "transaction_pairing"]),
        ("anomaly_detection_agent", "http://localhost:8080", ["anomaly_detection", "fraud_detection"]),
        ("resolution_agent", "http://localhost:8080", ["resolution", "reporting"]),
        ("audit_agent", "http://localhost:8080", ["audit", "compliance"]),
        ("supervisor", "http://localhost:8080", ["orchestration", "routing"])
    ]

    for agent_name, endpoint, capabilities in agents_to_register:
        agent_registry.register_agent(agent_name, endpoint, capabilities)

    logger.info("‚úÖ All agents registered")


@app.on_event("shutdown")
async def shutdown_event():
    """Cleanup tasks"""
    logger.info("üõë Shutting down Financial Reconciliation System")


# ============================================================================
# RUN APPLICATION
# ============================================================================

if __name__ == "__main__":
    import uvicorn

    uvicorn.run(
        "main:app",
        host="0.0.0.0",
        port=8080,
        reload=True,
        log_level="info"
    )

In [None]:
# requirements.txt
"""
langchain==0.1.0
langchain-google-vertexai==1.0.0
langgraph==0.0.20
langsmith==0.0.87
google-cloud-storage==2.14.0
google-cloud-bigquery==3.17.0
google-cloud-firestore==2.14.0
google-cloud-logging==3.9.0
google-cloud-trace==1.11.3
vertexai==1.38.0
redis==5.0.1
pandas==2.1.4
numpy==1.26.3
pydantic==2.5.3
fastapi==0.109.0
uvicorn==0.27.0
python-multipart==0.0.6
anthropic==0.18.1
openai==1.10.0
httpx==0.26.0
python-dotenv==1.0.0
loguru==0.7.2
prometheus-client==0.19.0
opentelemetry-api==1.22.0
opentelemetry-sdk==1.22.0
opentelemetry-exporter-gcp-trace==1.6.0
"""

# .env
"""
GOOGLE_CLOUD_PROJECT=your-project-id
GOOGLE_APPLICATION_CREDENTIALS=path/to/service-account.json
VERTEX_AI_LOCATION=us-central1
REDIS_HOST=localhost
REDIS_PORT=6379
LANGSMITH_API_KEY=your-langsmith-key
LANGSMITH_PROJECT=financial-reconciliation
ANTHROPIC_API_KEY=your-anthropic-key
"""

# config.py
"""Configuration management for the reconciliation system"""

import os
from typing import Optional
from pydantic import BaseModel, Field
from dotenv import load_dotenv

load_dotenv()

class GCPConfig(BaseModel):
    """Google Cloud Platform configuration"""
    project_id: str = Field(default_factory=lambda: os.getenv("GOOGLE_CLOUD_PROJECT"))
    location: str = Field(default_factory=lambda: os.getenv("VERTEX_AI_LOCATION", "us-central1"))
    bucket_name: str = Field(default="financial-reconciliation-data")
    dataset_id: str = Field(default="reconciliation")
    credentials_path: Optional[str] = Field(default_factory=lambda: os.getenv("GOOGLE_APPLICATION_CREDENTIALS"))

class LLMConfig(BaseModel):
    """LLM configuration"""
    model_name: str = Field(default="gemini-1.5-pro")
    temperature: float = Field(default=0.1)
    max_tokens: int = Field(default=4096)
    timeout: int = Field(default=120)

class RedisConfig(BaseModel):
    """Redis configuration for memory"""
    host: str = Field(default_factory=lambda: os.getenv("REDIS_HOST", "localhost"))
    port: int = Field(default_factory=lambda: int(os.getenv("REDIS_PORT", "6379")))
    db: int = Field(default=0)
    password: Optional[str] = Field(default=None)

class ObservabilityConfig(BaseModel):
    """Observability configuration"""
    enable_langsmith: bool = Field(default=True)
    enable_cloud_trace: bool = Field(default=True)
    enable_cloud_logging: bool = Field(default=True)
    langsmith_project: str = Field(default_factory=lambda: os.getenv("LANGSMITH_PROJECT", "financial-reconciliation"))

class SystemConfig(BaseModel):
    """Main system configuration"""
    gcp: GCPConfig = Field(default_factory=GCPConfig)
    llm: LLMConfig = Field(default_factory=LLMConfig)
    redis: RedisConfig = Field(default_factory=RedisConfig)
    observability: ObservabilityConfig = Field(default_factory=ObservabilityConfig)

    # Agent settings
    max_iterations: int = Field(default=10)
    session_timeout: int = Field(default=3600)
    checkpoint_interval: int = Field(default=5)

# Global config instance
config = SystemConfig()

In [None]:
from google.adk.agents import LlmAgent, BaseAgent

# Define individual agents
greeter = LlmAgent(name="greeter", model="gemini-2.5-flash")
task_executor = LlmAgent(name="task_executor", model="gemini-2.5-flash")

# Create parent agent and assign children via sub_agents
coordinator = LlmAgent(
    name="Coordinator",
    model="gemini-2.5-flash",
    description="I coordinate greetings and tasks.",
    sub_agents=[ # Assign sub_agents here
        greeter,
        task_executor
    ]
)

In [None]:
%%writefile sample_agent/.agent_engine_config.json
{
    "min_instances": 0,
    "max_instances": 1,
    "resource_limits": {"cpu": "1", "memory": "1Gi"}
}

In [None]:
!adk deploy agent_engine --project=$PROJECT_ID --region=$deployed_region sample_agent --agent_engine_config_file=sample_agent/.agent_engine_config.json

In [None]:
# Initialize Vertex AI
vertexai.init(project=PROJECT_ID, location=deployed_region)

# Get the most recently deployed agent
agents_list = list(agent_engines.list())
if agents_list:
    remote_agent = agents_list[0]  # Get the first (most recent) agent
    client = agent_engines
    print(f"‚úÖ Connected to deployed agent: {remote_agent.resource_name}")
else:
    print("‚ùå No agents found. Please deploy first.")

In [None]:
!pip install ucimlrepo

Collecting ucimlrepo
  Downloading ucimlrepo-0.0.7-py3-none-any.whl.metadata (5.5 kB)
Downloading ucimlrepo-0.0.7-py3-none-any.whl (8.0 kB)
Installing collected packages: ucimlrepo
Successfully installed ucimlrepo-0.0.7


In [None]:
from ucimlrepo import fetch_ucirepo

# fetch dataset
bank_marketing = fetch_ucirepo(id=222)

# data (as pandas dataframes)
X = bank_marketing.data.features
y = bank_marketing.data.targets

# metadata
print(bank_marketing.metadata)

# variable information
print(bank_marketing.variables)

{'uci_id': 222, 'name': 'Bank Marketing', 'repository_url': 'https://archive.ics.uci.edu/dataset/222/bank+marketing', 'data_url': 'https://archive.ics.uci.edu/static/public/222/data.csv', 'abstract': 'The data is related with direct marketing campaigns (phone calls) of a Portuguese banking institution. The classification goal is to predict if the client will subscribe a term deposit (variable y).', 'area': 'Business', 'tasks': ['Classification'], 'characteristics': ['Multivariate'], 'num_instances': 45211, 'num_features': 16, 'feature_types': ['Categorical', 'Integer'], 'demographics': ['Age', 'Occupation', 'Marital Status', 'Education Level'], 'target_col': ['y'], 'index_col': None, 'has_missing_values': 'yes', 'missing_values_symbol': 'NaN', 'year_of_dataset_creation': 2014, 'last_updated': 'Fri Aug 18 2023', 'dataset_doi': '10.24432/C5K306', 'creators': ['S. Moro', 'P. Rita', 'P. Cortez'], 'intro_paper': {'ID': 277, 'type': 'NATIVE', 'title': 'A data-driven approach to predict the s

In [None]:
!pip install google-adk



In [None]:
!pip install git+https://github.com/google/adk-python

In [None]:
from google.adk.agents import Agent
from google.adk.tools import google_search

root_agent = Agent(
    name="search_assistant",
    model="gemini-2.5-flash", # Or your preferred Gemini model
    instruction="You are a helpful assistant. Answer user questions using Google Search when needed.",
    description="An assistant that can search the web.",
    tools=[google_search]
)

In [None]:
# Initialize Vertex AI
vertexai.init(project=PROJECT_ID, location=deployed_region)

# Get the most recently deployed agent
agents_list = list(agent_engines.list())
if agents_list:
    remote_agent = agents_list[0]  # Get the first (most recent) agent
    client = agent_engines
    print(f"‚úÖ Connected to deployed agent: {remote_agent.resource_name}")
else:
    print("‚ùå No agents found. Please deploy first.")

In [None]:
import os
import random
import time
import vertexai
from kaggle_secrets import UserSecretsClient
from vertexai import agent_engines

print("‚úÖ Imports completed successfully")

ModuleNotFoundError: No module named 'kaggle_secrets'

In [None]:
## Set your PROJECT_ID
PROJECT_ID = "steady-petal-478110-e9"  # TODO: Replace with your project ID
os.environ["GOOGLE_CLOUD_PROJECT"] = steady-petal-478110-e9

if PROJECT_ID == "your-project-id" or not PROJECT_ID:
    raise ValueError("‚ö†Ô∏è Please replace 'your-project-id' with your actual Google Cloud Project ID.")

print(f"‚úÖ Project ID set to: {PROJECT_ID}")

In [None]:
## Create simple agent - all code for the agent will live in this directory
!mkdir -p sample_agent

print(f"‚úÖ Sample Agent directory created")

In [None]:
%%writefile sample_agent/.env

# https://cloud.google.com/vertex-ai/generative-ai/docs/learn/locations#global-endpoint
GOOGLE_CLOUD_LOCATION="global"

# Set to 1 to use Vertex AI, or 0 to use Google AI Studio
GOOGLE_GENAI_USE_VERTEXAI=1

In [None]:
%%writefile sample_agent/.agent_engine_config.json
{
    "min_instances": 0,
    "max_instances": 1,
    "resource_limits": {"cpu": "1", "memory": "1Gi"}
}

In [None]:
regions_list = ["europe-west1", "europe-west4", "us-east4", "us-west1"]
deployed_region = random.choice(regions_list)

print(f"‚úÖ Selected deployment region: {deployed_region}")

In [None]:
!adk deploy agent_engine --project=$PROJECT_ID --region=$deployed_region sample_agent --agent_engine_config_file=sample_agent/.agent_engine_config.json

In [None]:
# Initialize Vertex AI
vertexai.init(project=PROJECT_ID, location=deployed_region)

# Get the most recently deployed agent
agents_list = list(agent_engines.list())
if agents_list:
    remote_agent = agents_list[0]  # Get the first (most recent) agent
    client = agent_engines
    print(f"‚úÖ Connected to deployed agent: {remote_agent.resource_name}")
else:
    print("‚ùå No agents found. Please deploy first.")

In [None]:
async for item in remote_agent.async_stream_query(
    message="What is the weather in Tokyo?",
    user_id="user_42",
):
    print(item)

In [None]:
////   Cloud Run microservice (FastAPI)

Below is a minimal example that shows core endpoints and calling Vertex AI (Python)
# app/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from google.cloud import aiplatform
import os, uuid, json

app = FastAPI()
PROJECT = os.getenv("PROJECT_ID", "<steady-petal-478110-e9>")
LOCATION = os.getenv("LOCATION", "us-central1")
MODEL = os.getenv("MODEL", "projects/<PROJECT>/locations/us-central1/models/gemini-1.5-pro")

aiplatform.init(project=PROJECT, location=LOCATION)

class RawTxn(BaseModel):
    raw_description: str
    amount: float
    currency: str
    date: str

@app.post("/v1/normalize")
async def normalize(raw: RawTxn):
    # quick local extraction fallback + Gemini call for tough cases
    prompt = [
        {"role":"system","content":"You are a financial extractor. Output JSON."},
        {"role":"user","content": json.dumps(raw.dict())}
    ]
    # Use Vertex AI text generation call (pseudo)
    client = aiplatform.gapic.PredictionServiceClient()
    response = client.predict(
        endpoint=MODEL,
        instances=[{"content": json.dumps(prompt)}],
    )
    # parse response - simplified
    output_text = response.predictions[0].get("content", "{}")
    return json.loads(output_text)

@app.post("/v1/reconcile")
async def reconcile(req: dict):
    # 1. fetch txns & GL (from BigQuery) - omitted
    # 2. pre-match rules (SQL) - omitted
    # 3. vector search for candidates - omitted
    # 4. call Gemini matcher
    matcher_prompt = [
      {"role":"system","content":"You are a reconciliation assistant..."},
      {"role":"user","content": json.dumps(req)}
    ]
    client = aiplatform.gapic.PredictionServiceClient()
    response = client.predict(endpoint=MODEL, instances=[{"content": json.dumps(matcher_prompt)}])
    # parse & return
    return response.predictions

In [None]:
Synthetic data generator (Python + Faker)

Use this to create bank transactions + GL ledgers with controlled exceptions.

# synthetic_data_gen.py
from faker import Faker
import random, uuid, csv, datetime, json

fake = Faker()
FREQ = 1000

def gen_transaction():
    txn_id = str(uuid.uuid4())
    amount = round(random.choice([50,100,250,1000,random.uniform(10,10000)]),2)
    vendor = fake.company().upper()
    invoice = str(random.randint(1000,9999)) if random.random() < 0.6 else ""
    narr = f"{vendor} INV {invoice} REF {random.randint(10000,99999)}"
    date = fake.date_between(start_date='-60d', end_date='today').isoformat()
    return {
        "transaction_id": txn_id,
        "posted_date": date,
        "amount": amount,
        "currency": "USD",
        "raw_description": narr,
        "normalized_payee": vendor,
        "invoice_number": invoice or None
    }

def gen_gl_entry(txn):
    # Create matching GL entry 70% of time
    if random.random() < 0.7 and txn["invoice_number"]:
        return {
            "entry_id": str(uuid.uuid4()),
            "ledger_date": txn["posted_date"],
            "account_code": "2000",
            "invoice_id": txn["invoice_number"],
            "vendor": txn["normalized_payee"],
            "amount": txn["amount"],
            "currency": txn["currency"]
        }
    else:
        # unrelated entry
        return {
            "entry_id": str(uuid.uuid4()),
            "ledger_date": fake.date_between(start_date='-80d', end_date='today').isoformat(),
            "account_code": "2000",
            "invoice_id": str(random.randint(1000,9999)),
            "vendor": fake.company().upper(),
            "amount": round(random.uniform(10,20000),2),
            "currency": "USD"
        }

if __name__ == "__main__":
    txns = [gen_transaction() for _ in range(FREQ)]
    gls = [gen_gl_entry(t) for t in txns]
    with open("transactions.json","w") as f:
        json.dump(txns,f,indent=2)
    with open("gl_entries.json","w") as f:
        json.dump(gls,f,indent=2)
    print("Wrote transactions.json and gl_entries.json")

In [None]:
df1 = pd.read_json("transactions.json")
df2 = pd.read_json("gl_entries.json")


In [1]:
df11=pd.read_csv("/content/sample_data/Bank Reconciliation Sample.csv")


NameError: name 'pd' is not defined

In [2]:

import pandas as pd

# Load the dataset

df11 = pd.read_csv("/content/sample_data/Bank Reconciliation Sample.csv")



In [12]:
df11.head()

Unnamed: 0,Company Cashbook,Unnamed: 1,Unnamed: 2,Unnamed: 3,Unnamed: 4,Unnamed: 5,Unnamed: 6
0,,,,,,,
1,Debit - Dr,,,,Credit - Cr,,
2,Date,Details,Amount ($),,Date,Details,Amount ($)
3,01-Jun-23,Balance b/d,186200,,01-Jun-23,Electricity - 7864,24300
4,04-Jun-23,Mr. Ali - 7981,21200,,02-Jun-23,Mr. Usman - 7865,30700


In [13]:
df11.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 15 entries, 0 to 14
Data columns (total 7 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   Company Cashbook  10 non-null     object 
 1   Unnamed: 1        10 non-null     object 
 2   Unnamed: 2        10 non-null     object 
 3   Unnamed: 3        0 non-null      float64
 4   Unnamed: 4        13 non-null     object 
 5   Unnamed: 5        13 non-null     object 
 6   Unnamed: 6        13 non-null     object 
dtypes: float64(1), object(6)
memory usage: 972.0+ bytes


In [3]:
df12=pd.read_csv("/content/sample_data/Bank_Personal_Loan_Modelling.csv")

In [14]:
df12.head()

Unnamed: 0,ID,Age,Experience,Income,ZIP Code,Family,CCAvg,Education,Mortgage,Personal Loan,Securities Account,CD Account,Online,CreditCard
0,1,25,1,49,91107,4,1.6,1,0,0,1,0,0,0
1,2,45,19,34,90089,3,1.5,1,0,0,1,0,0,0
2,3,39,15,11,94720,1,1.0,1,0,0,0,0,0,0
3,4,35,9,100,94112,1,2.7,2,0,0,0,0,0,0
4,5,35,8,45,91330,4,1.0,2,0,0,0,0,0,1


In [15]:
df12.describe()

Unnamed: 0,ID,Age,Experience,Income,ZIP Code,Family,CCAvg,Education,Mortgage,Personal Loan,Securities Account,CD Account,Online,CreditCard
count,5000.0,5000.0,5000.0,5000.0,5000.0,5000.0,5000.0,5000.0,5000.0,5000.0,5000.0,5000.0,5000.0,5000.0
mean,2500.5,45.3384,20.1046,73.7742,93152.503,2.3964,1.937938,1.881,56.4988,0.096,0.1044,0.0604,0.5968,0.294
std,1443.520003,11.463166,11.467954,46.033729,2121.852197,1.147663,1.747659,0.839869,101.713802,0.294621,0.305809,0.23825,0.490589,0.455637
min,1.0,23.0,-3.0,8.0,9307.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,1250.75,35.0,10.0,39.0,91911.0,1.0,0.7,1.0,0.0,0.0,0.0,0.0,0.0,0.0
50%,2500.5,45.0,20.0,64.0,93437.0,2.0,1.5,2.0,0.0,0.0,0.0,0.0,1.0,0.0
75%,3750.25,55.0,30.0,98.0,94608.0,3.0,2.5,3.0,101.0,0.0,0.0,0.0,1.0,1.0
max,5000.0,67.0,43.0,224.0,96651.0,4.0,10.0,3.0,635.0,1.0,1.0,1.0,1.0,1.0


In [4]:
df13=pd.read_csv("/content/sample_data/Bank_Personal_Loan_Modelling.csv")

In [5]:
df14=pd.read_csv("/content/sample_data/Comprehensive_Banking_Database.csv")

In [6]:
df15=pd.read_csv("/content/sample_data/Customer_Data.csv")

In [11]:
df16=pd.read_csv("//content/sample_data/all-data.csv", encoding='latin1')