## Test Function Calling via REST API


In [None]:
# 1. install dependencies
!pip install requests dotenv pandas boto3

In [2]:
# 2. Update the .env file with environment variable COGNITO_PASSWORD with your Cognito username's password


In [None]:
import logging
import os
import requests
import json
import logging
import uuid
import boto3
from dotenv import load_dotenv
from pathlib import Path
import time
from botocore.exceptions import ClientError

# Set up logging
logging.basicConfig(level=logging.INFO, 
                    format='%(asctime)s [%(levelname)s] %(message)s',
                    datefmt='%H:%M:%S')
logger = logging.getLogger(__name__)

def find_dotenv():
    """
    Find the .env file by looking in several possible locations.
    This makes the code more robust when running in different environments like Jupyter notebooks.
    
    Returns:
        Path: Path to the .env file
    """
    # Try different possible locations for the .env file
    possible_paths = [
        # Current directory
        Path('.env'),
        # Parent directory (for when running from a subdirectory)
        Path('..') / '.env',
        # Deployment directory 
        Path('..') / 'deployment' / '.env',
        # From the current working directory
        Path(os.getcwd()) / '.env',
        # From the parent of the current working directory
        Path(os.getcwd()).parent / '.env',
    ]
    
    # Try each path
    for path in possible_paths:
        if path.exists():
            logger.info(f"Found .env file at: {path.absolute()}")
            return path
    
    # If no .env file is found, log a warning and return the default path
    logger.warning("No .env file found in any of the expected locations")
    return None


def update_cognito_client():
    """Update the Cognito App Client to enable required auth flows."""
    # Load environment variables
    env_path = find_dotenv()
    if env_path:
        load_dotenv(dotenv_path=env_path)
    
    user_pool_id = os.getenv('COGNITO_USER_POOL_ID')
    client_id = os.getenv('COGNITO_APP_CLIENT_ID')
    region = os.getenv('AWS_REGION', 'us-east-1')
    
    try:
        # Create session with the correct profile
        session = boto3.Session()
        cognito_client = session.client('cognito-idp', region_name=region)
        
        logger.info("=== Updating Cognito App Client ===")
        
        # Update the client with required auth flows
        response = cognito_client.update_user_pool_client(
            UserPoolId=user_pool_id,
            ClientId=client_id,
            ExplicitAuthFlows=[
                'ALLOW_USER_PASSWORD_AUTH',
                'ALLOW_ADMIN_USER_PASSWORD_AUTH',
                'ALLOW_REFRESH_TOKEN_AUTH',
                'ALLOW_USER_SRP_AUTH',
            ]
        )
        
        logger.info("âœ… Successfully updated Cognito App Client!")
        logger.info("Enabled auth flows: ALLOW_USER_PASSWORD_AUTH, ALLOW_ADMIN_USER_PASSWORD_AUTH, ALLOW_REFRESH_TOKEN_AUTH")
        
        return True
        
    except ClientError as e:
        error_code = e.response['Error']['Code']
        error_message = e.response['Error']['Message']
        logger.error(f"Failed to update Cognito client ({error_code}): {error_message}")
        return False
    except Exception as e:
        logger.error(f"Failed to update Cognito client: {e}")
        return False
update_cognito_client()

In [4]:
# 4. Define the RetailRestApiClient that handles the authentication and sends the query to the REST API
import os
import requests
import json
import logging
import uuid
import boto3
from dotenv import load_dotenv
from pathlib import Path
import time
from botocore.exceptions import ClientError


class CognitoAuthenticator:
    """
    Handles AWS Cognito authentication and JWT token management.
    """
    
    def __init__(self, user_pool_id=None, client_id=None, region=None):
        """
        Initialize the Cognito authenticator.
        
        Args:
            user_pool_id (str, optional): Cognito User Pool ID
            client_id (str, optional): Cognito App Client ID  
            region (str, optional): AWS region
        """
        # Load environment variables
        env_path = find_dotenv()
        load_dotenv(dotenv_path=env_path)
        
        self.user_pool_id = user_pool_id or os.getenv('COGNITO_USER_POOL_ID')
        self.client_id = client_id or os.getenv('COGNITO_APP_CLIENT_ID')
        self.region = region or os.getenv('AWS_REGION', 'us-east-1')
        
        if not self.user_pool_id or not self.client_id:
            raise ValueError("Cognito User Pool ID and Client ID are required. Set COGNITO_USER_POOL_ID and COGNITO_APP_CLIENT_ID environment variables.")
        
        # Initialize Cognito client
        self.cognito_client = boto3.client('cognito-idp', region_name=self.region)
        
        # Token storage
        self.access_token = None
        self.id_token = None
        self.refresh_token = None
        self.token_expiry = None
        
        logger.info(f"Initialized Cognito authenticator for region: {self.region}")
        logger.info(f"User Pool ID: {self.user_pool_id}")
        logger.info(f"Client ID: {self.client_id}")
    
    def authenticate_with_password(self, username, password):
        """
        Authenticate with username and password to get JWT tokens.
        
        Args:
            username (str): Username (email)
            password (str): Password
            
        Returns:
            dict: Authentication result with tokens
        """
        try:
            response = self.cognito_client.admin_initiate_auth(
                UserPoolId=self.user_pool_id,
                ClientId=self.client_id,
                AuthFlow='ADMIN_NO_SRP_AUTH',
                AuthParameters={
                    'USERNAME': username,
                    'PASSWORD': password
                }
            )
            
            # Extract tokens
            auth_result = response['AuthenticationResult']
            self.access_token = auth_result['AccessToken']
            self.id_token = auth_result['IdToken']
            self.refresh_token = auth_result.get('RefreshToken')
            
            # Calculate token expiry (tokens typically expire in 1 hour)
            expires_in = auth_result.get('ExpiresIn', 3600)  # Default to 1 hour
            self.token_expiry = time.time() + expires_in
            
            logger.info(f"Successfully authenticated user: {username}")
            logger.info(f"Tokens will expire in {expires_in} seconds")
            
            return {
                'access_token': self.access_token,
                'id_token': self.id_token,
                'refresh_token': self.refresh_token,
                'expires_in': expires_in
            }
            
        except ClientError as e:
            error_code = e.response['Error']['Code']
            error_message = e.response['Error']['Message']
            
            if error_code == 'NotAuthorizedException':
                raise ValueError(f"Authentication failed: {error_message}")
            elif error_code == 'UserNotFoundException':
                raise ValueError(f"User not found: {error_message}")
            elif error_code == 'UserNotConfirmedException':
                raise ValueError(f"User not confirmed: {error_message}")
            else:
                raise ValueError(f"Authentication error ({error_code}): {error_message}")
    
    def refresh_tokens(self):
        """
        Refresh the access and ID tokens using the refresh token.
        
        Returns:
            dict: New authentication result with refreshed tokens
        """
        if not self.refresh_token:
            raise ValueError("No refresh token available. Please authenticate again.")
        
        try:
            response = self.cognito_client.admin_initiate_auth(
                UserPoolId=self.user_pool_id,
                ClientId=self.client_id,
                AuthFlow='REFRESH_TOKEN_AUTH',
                AuthParameters={
                    'REFRESH_TOKEN': self.refresh_token
                }
            )
            
            # Extract new tokens
            auth_result = response['AuthenticationResult']
            self.access_token = auth_result['AccessToken']
            self.id_token = auth_result['IdToken']
            # Note: Refresh token might not be returned in refresh response
            
            # Calculate new token expiry
            expires_in = auth_result.get('ExpiresIn', 3600)
            self.token_expiry = time.time() + expires_in
            
            logger.info("Successfully refreshed tokens")
            
            return {
                'access_token': self.access_token,
                'id_token': self.id_token,
                'expires_in': expires_in
            }
            
        except ClientError as e:
            error_code = e.response['Error']['Code']
            error_message = e.response['Error']['Message']
            raise ValueError(f"Token refresh failed ({error_code}): {error_message}")
    
    def get_valid_token(self, token_type='id'):
        """
        Get a valid token, refreshing if necessary.
        
        Args:
            token_type (str): Type of token to return ('id' or 'access')
            
        Returns:
            str: Valid JWT token
        """
        # Check if tokens need to be refreshed (refresh 5 minutes before expiry)
        if self.token_expiry and time.time() > (self.token_expiry - 300):
            logger.info("Tokens are expiring soon, refreshing...")
            self.refresh_tokens()
        
        if token_type == 'id':
            if not self.id_token:
                raise ValueError("No ID token available. Please authenticate first.")
            return self.id_token
        elif token_type == 'access':
            if not self.access_token:
                raise ValueError("No access token available. Please authenticate first.")
            return self.access_token
        else:
            raise ValueError("token_type must be 'id' or 'access'")


class RetailRestApiClient:
    """
    Client for interacting with the Retail Store Assistant REST API with Cognito authentication.
    This client handles JWT token authentication automatically.
    """
    
    def __init__(self, username=None, password=None, user_id=None, api_url=None, debug=False, 
                 cognito_user_pool_id=None, cognito_client_id=None, region=None):
        """
        Initialize the REST API client with Cognito authentication.
        
        Args:
            username (str, optional): Cognito username (email) for authentication
            password (str, optional): Cognito password for authentication
            user_id (str, optional): User ID for the session. If None, will use username
            api_url (str, optional): REST API URL. If None, it will be read from the .env file
            debug (bool, optional): Enable debug logging. Default is False
            cognito_user_pool_id (str, optional): Cognito User Pool ID
            cognito_client_id (str, optional): Cognito App Client ID
            region (str, optional): AWS region
        """
        # Load environment variables from .env file
        env_path = find_dotenv()
        load_dotenv(dotenv_path=env_path)
        
        self.password = password or os.getenv('COGNITO_PASSWORD')
        self.user_id = user_id or os.getenv('EMAIL')
        self.cognito_user_pool_id = cognito_user_pool_id or os.getenv('COGNITO_USER_POOL_ID')
        self.cognito_client_id = cognito_client_id or os.getenv('COGNITO_APP_CLIENT_ID')
        self.region = region or os.getenv('AWS_REGION', 'us-east-1')
        
        RESTAPI_URL = f'https://backend.{os.getenv("DOMAIN_NAME")}/api'
        self.api_url = api_url or RESTAPI_URL
        self.debug = debug
        
        if self.debug:
            logger.setLevel(logging.DEBUG)
            
        if not self.api_url:
            raise ValueError("API URL not provided and not found in .env file")
        
        # Initialize Cognito authenticator
        self.authenticator = CognitoAuthenticator(
            user_pool_id=self.cognito_user_pool_id,
            client_id=self.cognito_client_id,
            region=self.region
        )
        
        # Authenticate if credentials are provided
        if self.user_id and self.password:
            self.authenticate()
        
        logger.info(f"Initialized REST API client with URL: {self.api_url}")
        logger.info(f"Using user ID: {self.user_id}")
    
    def authenticate(self, user_id=None, password=None):
        """
        Authenticate with Cognito to get JWT tokens.
        
        Args:
            username (str, optional): Username to authenticate with
            password (str, optional): Password to authenticate with
        """
        auth_username = user_id or self.user_id
        auth_password = password or self.password
        
        if not auth_username or not auth_password:
            raise ValueError("Username and password are required for authentication")
        
        logger.info(f"Authenticating with Cognito for user: {auth_username}")
        
        try:
            result = self.authenticator.authenticate_with_password(auth_username, auth_password)
            logger.info("Successfully authenticated with Cognito")
            return result
        except Exception as e:
            logger.error(f"Authentication failed: {e}")
            raise

    def send_query(self, query, session_id=None):
        """
        Send a text query to the REST API with JWT authentication.
        
        Args:
            query (str): The text query to send
            session_id (str, optional): Session ID for continuing a conversation. 
                                       If None, a new conversation will be started.
                                       
        Returns:
            dict: The API response as a dictionary
        """
        if not query:
            raise ValueError("Query cannot be empty")
            
        # Ensure the API URL doesn't end with a slash
        api_url = self.api_url.rstrip('/')
        
        # Construct the endpoint URL
        endpoint = f"{api_url}/chat"
        
        # Get valid JWT token
        try:
            jwt_token = self.authenticator.get_valid_token('id')
        except ValueError as e:
            logger.error(f"Failed to get valid token: {e}")
            raise ValueError("Authentication required. Please call authenticate() first.")
        
        # Prepare headers with JWT token
        headers = {
            'Content-Type': 'application/json',
            'Authorization': f'Bearer {jwt_token}'
        }
            
        # Prepare the request payload
        payload = {
            'userId': self.user_id,
            'query': query,
            'sessionId': session_id or '1234'
        }
        
        logger.debug(f"Sending query to {endpoint}: {query}")
        logger.debug(f"Request payload: {json.dumps(payload, indent=2)}")
        
        try:
            # Record start time
            start_time = time.time()
    

            # Send the GET request
            response = requests.get(endpoint, headers=headers, params=payload)
            
            # Check if the request was successful
            response.raise_for_status()
            
            # Parse the JSON response
            response_data = response.json()

            logger.debug(f"Received response with status code: {response.status_code}")
            logger.debug(f"Response data: {json.dumps(response_data, indent=2)}")

            # Record end time and calculate duration
            end_time = time.time()
            duration_ms = (end_time - start_time) * 1000  # Convert to milliseconds
            
            # Log the duration
            logger.info(f"Request duration: {duration_ms:.2f} ms")
            
            # Add duration to the response data
            response_data['request_duration_ms'] = round(duration_ms, 2)
            
            return response_data
            
        except requests.exceptions.RequestException as e:
            logger.error(f"Request failed: {e}")
            if hasattr(e, 'response') and e.response:
                logger.error(f"Response status code: {e.response.status_code}")
                logger.error(f"Response text: {e.response.text}")
                
                # Handle authentication errors
                if e.response.status_code == 401:
                    logger.info("Authentication failed, attempting to refresh tokens...")
                    try:
                        self.authenticator.refresh_tokens()
                        logger.info("Tokens refreshed, retrying request...")
                        # Retry the request with new token
                        jwt_token = self.authenticator.get_valid_token('id')
                        headers['Authorization'] = f'Bearer {jwt_token}'
                        response = requests.get(endpoint, headers=headers, params=payload)
                        response.raise_for_status()
                        return response.json()
                    except Exception as refresh_error:
                        logger.error(f"Token refresh failed: {refresh_error}")
                        raise ValueError("Authentication failed and token refresh unsuccessful. Please authenticate again.")
            raise  
    


In [None]:
# Create a client instance
client = RetailRestApiClient(debug=True)

##  Test case with Nova Pro as reasoning model for function calling

In [6]:
model = 'nova_pro'

In [None]:
import pandas as pd
query='What is my schedule?'
session_id = '1234'
# Send the query
prompt1_response = client.send_query(query, session_id)

# add query and response to df
df_2 = pd.DataFrame([{'query': query, 'response': prompt1_response.get('chat_response', 'N/A'), 'model': model, 'request_duration_ms': prompt1_response.get('request_duration_ms', 'N/A')}])

In [None]:
query='What is the store schedule?'
prompt2_response = client.send_query(query, session_id)

# append query and response to a df
df_2 = pd.concat([df_2, pd.DataFrame([{'query': query, 'response': prompt2_response.get('chat_response', 'N/A'), 'model': model, 'request_duration_ms': prompt2_response.get('request_duration_ms', 'N/A')}])])


In [None]:
query='What are the past three customer transactions for customer id 2?'
prompt3_response = client.send_query(query, session_id)

# append query and response to a df
df_2 = pd.concat([df_2, pd.DataFrame([{'query': query, 'response': prompt3_response.get('chat_response', 'N/A'), 'model': model, 'request_duration_ms': prompt3_response.get('request_duration_ms', 'N/A')}])])

In [None]:
query='What are some product recommendations based on the past customer transactions for customer id 2?'
prompt4_response = client.send_query(query, session_id)

# append query and response to a df
df_2 = pd.concat([df_2, pd.DataFrame([{'query': query, 'response': prompt4_response.get('chat_response', 'N/A'), 'model': model, 'request_duration_ms': prompt4_response.get('request_duration_ms', 'N/A')}])])

In [None]:
query='What is my timeoff?'
prompt5_response = client.send_query(query, session_id)

# append query and response to a df
df_2 = pd.concat([df_2, pd.DataFrame([{'query': query, 'response': prompt5_response.get('chat_response', 'N/A'), 'model': model, 'request_duration_ms': prompt5_response.get('request_duration_ms', 'N/A')}])])

In [None]:
query='What items are low in our product catalog?'
prompt6_response = client.send_query(query, session_id)

# append query and response to a df
df_2 = pd.concat([df_2, pd.DataFrame([{'query': query, 'response': prompt6_response.get('chat_response', 'N/A'), 'model': model, 'request_duration_ms': prompt6_response.get('request_duration_ms', 'N/A')}])])


In [23]:
# save the dataframe to a csv file
# create the directory if it doesn't exist
os.makedirs(f"./responses/model_{model}", exist_ok=True)
df_2.to_csv(f"./responses/model_{model}/{model}_function_calling_responses.csv", index=False)