In [18]:
%%writefile spanner_connector1.py
"""
Spanner Database Connector for Agent Builder
This module provides functions to connect to a Spanner database and retrieve customer information.
"""

from google.cloud import spanner
import logging
import os

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Spanner configuration
PROJECT_ID = "resolute-winter-447814-t5"
INSTANCE_ID = "bank-customer-instance"
DATABASE_ID = "bank-customer-db"  # Replace with your actual database name

def initialize_spanner_client():
    """
    Initialize and return a Spanner client instance.
    
    Returns:
        A Spanner client instance.
    """
    try:
        client = spanner.Client(project=PROJECT_ID)
        logger.info(f"Successfully initialized Spanner client for project {PROJECT_ID}")
        return client
    except Exception as e:
        logger.error(f"Error initializing Spanner client: {str(e)}")
        raise

def get_database():
    """
    Get a reference to the Spanner database.
    
    Returns:
        A Spanner database instance.
    """
    try:
        client = initialize_spanner_client()
        instance = client.instance(INSTANCE_ID)
        database = instance.database(DATABASE_ID)
        logger.info(f"Successfully connected to database {DATABASE_ID}")
        return database
    except Exception as e:
        logger.error(f"Error connecting to database: {str(e)}")
        raise

def authenticate_customer(customer_id, pin):
    """
    Authenticate a customer by verifying their customer ID and PIN.
    
    Args:
        customer_id (str): The customer's ID.
        pin (str): The customer's PIN.
        
    Returns:
        bool: True if authentication is successful, False otherwise.
    """
    
    try:
        database = get_database()
        
        with database.snapshot() as snapshot:
            query = """
                SELECT customer_id, pin 
                FROM customers 
                WHERE customer_id = @customer_id
            """
            params = {"customer_id": customer_id}
            param_types = {"customer_id": spanner.param_types.STRING}
            
            results = snapshot.execute_sql(
                query, params=params, param_types=param_types
            )
            
            for row in results:
                stored_pin = row[1]
                if stored_pin == pin:
                    logger.info(f"Customer {customer_id} authenticated successfully")
                    return True
            
            logger.warning(f"Authentication failed for customer {customer_id}")
            return False
    except Exception as e:
        logger.error(f"Error during authentication: {str(e)}")
        return False

def get_customer_details(customer_id):
    """
    Get customer details from the database.
    
    Args:
        customer_id (str): The customer's ID.
        
    Returns:
        dict: A dictionary containing customer details, or None if customer not found.
    """
    try:
        database = get_database()
        
        with database.snapshot() as snapshot:
            query = """
                SELECT customer_id, first_name, last_name, email, phone
                FROM customers 
                WHERE customer_id = @customer_id
            """
            params = {"customer_id": customer_id}
            param_types = {"customer_id": spanner.param_types.STRING}
            
            results = snapshot.execute_sql(
                query, params=params, param_types=param_types
            )
            
            for row in results:
                customer = {
                    "customer_id": row[0],
                    "first_name": row[1],
                    "last_name": row[2],
                    "email": row[3],
                    "phone": row[4]
                }
                return customer
            
            logger.warning(f"Customer {customer_id} not found")
            return None
    except Exception as e:
        logger.error(f"Error retrieving customer details: {str(e)}")
        return None
def update_customer_address(customer_id,new_address):
    try:
        database = get_database()
        def update_address(transaction):
            # Define the SQL statement with parameter placeholders
            sql = """
                UPDATE customers
                SET address = @new_address
                WHERE customer_id = @customer_id
            """
            # Define the parameters and their types
            params = {
                'new_address': new_address,
                'customer_id': customer_id
            }
            param_types = {
                'new_address': spanner.param_types.STRING,
                'customer_id': spanner.param_types.STRING
            }
            # Execute the update statement
            row_count = transaction.execute_update(sql, params=params, param_types=param_types)
            if row_count == 0:
                logger.warning(f"Customer {customer_id} not found.")
                return False
            return True

        # Run the transaction
        success = database.run_in_transaction(update_address)
        if success:
            logger.info(f"Address updated successfully for customer {customer_id}.")
        return success

    except Exception as e:
        logger.error(f"Error updating customer address: {str(e)}")
        return False
    


Overwriting spanner_connector1.py


In [5]:
!pip install google-cloud-functions


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [19]:
%%writefile main.py
import functions_framework
from flask import jsonify
import logging
import os
import sys

# Add the current directory to the path to import the spanner_connector module
sys.path.append(os.path.dirname(os.path.realpath(__file__)))
from spanner_connector1 import authenticate_customer, get_customer_details, update_customer_address

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

@functions_framework.http
def bank_agent_webhook1(request):
    """
    HTTP Cloud Function that integrates with Agent Builder.
    This function handles customer address update requests.
    
    Args:
        request (flask.Request): The request object.
        
    Returns:
        The response text, or any set of values that can be turned into a
        Response object using `make_response`.
    """
    # Set CORS headers for the preflight request
    if request.method == 'OPTIONS':
        headers = {
            'Access-Control-Allow-Origin': '*',
            'Access-Control-Allow-Methods': 'POST',
            'Access-Control-Allow-Headers': 'Content-Type',
            'Access-Control-Max-Age': '3600'
        }
        return ('', 204, headers)

    # Set CORS headers for the main request
    headers = {
        'Access-Control-Allow-Origin': '*'
    }
    
    try:
        # Parse the request JSON
        request_json = request.get_json(silent=True)
        if not request_json:
            logger.error("No JSON data in request")
            return jsonify({"error": "No JSON data in request"}), 400, headers
        
        # Log the request action
        logger.info(f"Received request with action: {request_json.get('action', 'unknown')}")
        
        # Get the action from the request
        action = request_json.get('action')
        
        if action == 'authenticate':
            # Handle authentication request
            customer_id = request_json.get('customer_id')
            pin = request_json.get('pin')
            if pin is not None:
                pin = str(pin)
            
            if not customer_id or not pin:
                logger.error("Missing customer_id or pin in authentication request")
                return jsonify({"error": "Missing customer_id or pin"}), 400, headers
            
            # Authenticate the customer
            auth_result = authenticate_customer(customer_id, pin)
            
            if auth_result:
                # Get customer details
                customer = get_customer_details(customer_id)
                return jsonify({
                    "authenticated": True,
                    "customer": customer
                }), 200, headers
            else:
                return jsonify({
                    "authenticated": False,
                    "message": "Invalid customer ID or PIN"
                }), 200, headers
                
        elif action == 'update_address':
            # Handle address update request
            customer_id = request_json.get('customer_id')
            new_address = request_json.get('new_address')
            
            if not customer_id or not new_address:
                logger.error("Missing customer_id or new_address in address update request")
                return jsonify({"error": "Missing customer_id or new_address"}), 400, headers
            
            # Update the customer's address
            update_successful = update_customer_address(customer_id, new_address)
            
            if update_successful:
                return jsonify({
                    "success": True,
                    "message": f"Address updated successfully for customer {customer_id}."
                }), 200, headers
            else:
                return jsonify({
                    "success": False,
                    "message": f"Failed to update address for customer {customer_id}."
                }), 500, headers
                
        else:
            logger.error(f"Unknown action: {action}")
            return jsonify({"error": f"Unknown action: {action}"}), 400, headers
            
    except Exception as e:
        logger.error(f"Error processing request: {str(e)}")
        return jsonify({"error": f"Internal server error: {str(e)}"}), 500, headers


Overwriting main.py


In [20]:
!gcloud functions deploy bank-agent-webhook1 \
  --runtime python310 \
  --trigger-http \
  --allow-unauthenticated \
  --entry-point bank_agent_webhook1 \
  --project resolute-winter-447814-t5 \
  --region us-central1\
  --service-account=232486347340-compute@developer.gserviceaccount.com

Preparing function...done.                                                     
Updating function (may take a while)...                                        
  . [Build]                                                                    
  . [Service]                                                                  
  . [ArtifactRegistry]                                                         
  . [Healthcheck]                                                              
  . [Triggercheck]                                                             
  Updating function (may take a while)...                                      





⠛ Updating function (may take a while)...                                      
  ⠛ [Build]                                                                    




⠹ Updating function (may take a while)...                                      
  ⠹ [Build] Build in progress... Logs are available at [https://console.cloud.g
  oogle.com/cloud-build/builds;