# Debug Enable OR Disable

In [1]:
logdebug = True

In [2]:
logdebug = False

# Imports

In [3]:
import json
import boto3

import sqlalchemy
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL

from langchain.docstore.document import Document

from langchain_community.llms import SagemakerEndpoint
from langchain.chains import LLMChain
from langchain_community.utilities import SQLDatabase
from langchain.llms.sagemaker_endpoint import LLMContentHandler
from langchain.chains.question_answering import load_qa_chain

from langchain_experimental.sql import SQLDatabaseChain


from langchain.chains.api.prompt import API_RESPONSE_PROMPT
from langchain.chains import APIChain
from langchain_core.prompts import PromptTemplate
from langchain_anthropic import ChatAnthropic
from langchain.chains.api import open_meteo_docs

from typing import Dict
import time

import boto3
import os

import inspect


In [4]:
inspect.getmodule(ChatAnthropic)

<module 'langchain_anthropic.chat_models' from '/home/ubuntu/anaconda3/lib/python3.11/site-packages/langchain_anthropic/chat_models.py'>

In [5]:
#provide user input
glue_databucket_name = 'genai-payment-processed' #Create this bucket in S3
glue_db_name='genai-payment100'
glue_role=  'AWSGlueServiceRole-gluepayment100'
glue_crawler_name=glue_db_name+'-crawler100'

In [6]:
region = "us-east-1" 
#os.environ['AWS_REGION']
print(region)

us-east-1


# Initialize Claude and Athena Glue

In [7]:

ANTHROPIC_API_KEY = "REPLACE_ME_WITH_ACTUAL_KEY"
#define large language model here. Make sure to set api keys for the variable ANTHROPIC_API_KEY
llm = ChatAnthropic(temperature=0, anthropic_api_key=ANTHROPIC_API_KEY, model_name="claude-3-sonnet-20240229", max_tokens_to_sample = 512)

#S3
# connect to s3 using athena
## athena variables
connathena=f"athena.{region}.amazonaws.com" 
portathena='443' #Update, if port is different
schemaathena=glue_db_name #from user defined params
s3stagingathena=f's3://{glue_databucket_name}/athenaresults'#from cfn params
wkgrpathena='primary'#Update, if workgroup is different
# tablesathena=['dataset']#[<tabe name>]
##  Create the athena connection string
connection_string = f"awsathena+rest://@{connathena}:{portathena}/{schemaathena}?s3_staging_dir={s3stagingathena}/&work_group={wkgrpathena}"
##  Create the athena  SQLAlchemy engine
engine_athena = create_engine(connection_string, echo=False)
dbathena = SQLDatabase(engine_athena)

gdc = [schemaathena] 


# Generate Dynamic prompts to populate the Glue Data Catalog harvest aws crawler metadata

In [8]:
#Generate Dynamic prompts to populate the Glue Data Catalog
#harvest aws crawler metadata

def parse_catalog():
    #Connect to Glue catalog
    #get metadata of redshift serverless tables
    columns_str=''

    #define glue cient
    glue_client = boto3.client('glue')

    for db in gdc:
        response = glue_client.get_tables(DatabaseName =db)
        for tables in response['TableList']:
            #classification in the response for s3 and other databases is different. Set classification based on the response location
            if tables['StorageDescriptor']['Location'].startswith('s3'):  classification='s3'
            else:  classification = tables['Parameters']['classification']
            for columns in tables['StorageDescriptor']['Columns']:
                    dbname,tblname,colname=tables['DatabaseName'],tables['Name'],columns['Name']
                    columns_str=columns_str+f'\n{classification}|{dbname}|{tblname}|{colname}'
    #API
    ## Append the metadata of the API to the unified glue data catalog
    columns_str=columns_str+'\n'+('api|meteo|weather|weather')
    return columns_str

glue_catalog = parse_catalog()

#display a few lines from the catalog
print('\n'.join(glue_catalog.splitlines()[-20:]) )

s3|genai-payment100|payment_dataset|orientation
s3|genai-payment100|payment_dataset|instructionid
s3|genai-payment100|payment_dataset|endtoendid
s3|genai-payment100|payment_dataset|networkinstructionid
s3|genai-payment100|payment_dataset|creditoraccountname
s3|genai-payment100|payment_dataset|paymentmethod
s3|genai-payment100|payment_dataset|transactiontype
s3|genai-payment100|payment_dataset|instructedamount
s3|genai-payment100|payment_dataset|product
s3|genai-payment100|payment_dataset|channel
s3|genai-payment100|payment_dataset|clientname
s3|genai-payment100|payment_dataset|sendername
s3|genai-payment100|payment_dataset|financialinstitutionname
s3|genai-payment100|payment_dataset|sendertype
s3|genai-payment100|payment_dataset|recipienttype
s3|genai-payment100|payment_dataset|clientkey
s3|genai-payment100|payment_dataset|externalpaymentreasoncode
s3|genai-payment100|payment_dataset|addtionalerrorinfo
s3|genai-payment100|payment_dataset|underlyingresponsecode
api|meteo|weather|weather

# Function 1 'Infer Channel'
# define a function that infers the channel/database/table and sets the database for querying

In [9]:

#Function 1 'Infer Channel'
#define a function that infers the channel/database/table and sets the database for querying
def identify_channel(query):
    #Prompt 1 'Infer Channel'
    ##set prompt template. It instructs the llm on how to evaluate and respond to the llm. It is referred to as dynamic since glue data catalog is first getting generated and appended to the prompt.
    prompt_template = """
     From the table below, find the database (in column database) which will contain the data (in corresponding column_names) to answer the question
     {query} \n
         If someone asks for the payment, they really mean the s3|genai-payment100|payment_dataset table.
     """+glue_catalog +"""
     Give your answer as database ==
     Also,give your answer as database.table ==
     """
    if logdebug:
        print("prompt_template ", prompt_template)
    ##define prompt 1
    PROMPT_channel = PromptTemplate( template=prompt_template, input_variables=["query"]  )
    if logdebug:
        print("PROMPT_channel ", PROMPT_channel)
    # define llm chain
    llm_chain = LLMChain(prompt=PROMPT_channel, llm=llm)
    #run the query and save to generated texts
    generated_texts = llm_chain.invoke(query)
    if logdebug:
        print("generated_texts ", generated_texts)
        print(type(generated_texts))
    #set the channel from where the query can be answered
    if 's3' in generated_texts["text"]:
            channel='db'
            db=dbathena
            print("SET database to athena")
    elif 'api' in generated_texts["text"]:
            channel='api'
            print("SET database to weather api")
    else: raise Exception("User question cannot be answered by any of the channels mentioned in the catalog")
    print("Step complete. Channel is: ", channel)

    return channel, db

In [10]:

#Function 2 'Run Query'
#define a function that infers the channel/database/table and sets the database for querying
def run_query(query):

    channel, db = identify_channel(query) #call the identify channel function first

    ##Prompt 2 'Run Query'
    #after determining the data channel, run the Langchain SQL Database chain to convert 'text to sql' and run the query against the source data channel.
    #provide rules for running the SQL queries in default template--> table info.

    _DEFAULT_TEMPLATE = """You are a SQL expert. Given an input question, first create a syntactically correct {dialect} query to run, then look at the results of the query and return the answer.

    Do not append 'Here is the SQL query' to SQLQuery.

    Display SQLResult after the query is run in plain english that users can understand.

    Provide answer in simple english statement.

    Only use the following tables:

    {table_info}

    Question: {input}"""

    if logdebug:
        print("_DEFAULT_TEMPLATE ", _DEFAULT_TEMPLATE)
        
    PROMPT_sql = PromptTemplate(
        input_variables=["input", "table_info", "dialect"], template=_DEFAULT_TEMPLATE
    )


    if channel=='db':
        db_chain = SQLDatabaseChain.from_llm(llm, db, prompt=PROMPT_sql, verbose=True, return_intermediate_steps=False)
        response=db_chain.invoke(query)
    elif channel=='api':
        chain_api = APIChain.from_llm_and_api_docs(llm, open_meteo_docs.OPEN_METEO_DOCS, verbose=True)
        response=chain_api.invoke(query)
    else: raise Exception("Unlisted channel. Check your unified catalog")
    return response


# Sample Prompts

In [11]:
query = """Please provide daily payments count for last 5 days by each ClientKey?"""
query = """How many payments have payment method rtp and sender name Jameel Robinson?"""
query = """How many payments with sender name CAMERON MCGRUDER?"""
query = """How many payments with payment rail as rtp?"""
query = """Please provide daily payments count for last 5 days on creation time?"""
countrecords = """How many number of payments?"""
countfilteronnameandrail = """How many payments have payment method rtp and sender name Alex?"""
listidbyfilteronname = """Provide the instruction id of payments with payment method rtp and sender name Alex?"""
countbyclientandrail = """Provide the counts of payments group by client with payment method rtp?"""
countbyclient = """Provide the counts of payments group by client?"""
countbychannel = """Provide the counts of payments group by channel?"""
countbyclientproductchannel = """Provide the counts of payments group by client, product and channel?"""
listidbymaxamount = """Provide the instruction id of payment that has maximum instructed amount?"""
listdatasample = """Provide the few payment instruction id and it's amount, channel and product?"""


# Counts By client

In [12]:
# Enter the query
## Few queries to try out -
#athena - Payments - payment dataset
countbyclient = """Provide the counts of payments group by client?"""
query = countbyclient

#Response from Langchain
response =  run_query(query)
print("----------------------------------------------------------------------")
print(f'SQL and response from user query {query}  \n  {response["result"]}')

SET database to athena
Step complete. Channel is:  db


[1m> Entering new SQLDatabaseChain chain...[0m
Provide the counts of payments group by client?
SQLQuery:[32;1m[1;3mSELECT clientname, COUNT(*) AS payment_count
FROM payment_dataset
GROUP BY clientname;[0m
SQLResult: [33;1m[1;3m[('Kinecta Federal Credit Union Cosmos', 303), ('Addition Financial Credit Union Cosmos', 227), ('Veridian Credit Union', 1079), ('Northwest Federal Credit Union Cosmos', 70), ('Royal Credit Union Cosmos', 347), ('Apple Federal Credit Union Cosmos', 174), ('Silver State Schools Credit Union Cosmos', 80), ('Navy Federal Credit Union Cosmos', 40782), ('Consumers CU Cosmos', 293), ('Avidia Bank Cosmos', 87), ('First Internet Bank Cosmos', 20)][0m
Answer:[32;1m[1;3mThe query groups the payment records by the client name and counts the number of payments for each client. The result shows that Navy Federal Credit Union Cosmos has the highest number of payments with 40,782, followed by Veridian Credit Uni

# Counts By client and payment method

In [14]:
# Enter the query
## Few queries to try out -
#athena - Payments - payment dataset
query = """Provide the counts of payments group by client with payment method rtp?"""

#Response from Langchain
response =  run_query(query)
print("----------------------------------------------------------------------")
print(f'SQL and response from user query {query}  \n  {response["result"]}')

SET database to athena
Step complete. Channel is:  db


[1m> Entering new SQLDatabaseChain chain...[0m
Provide the counts of payments group by client with payment method rtp?
SQLQuery:[32;1m[1;3mSELECT clientname, paymentmethod, COUNT(*) AS payment_count
FROM payment_dataset
WHERE paymentmethod = 'rtp'
GROUP BY clientname, paymentmethod;[0m
SQLResult: [33;1m[1;3m[('Apple Federal Credit Union Cosmos', 'rtp', 174), ('Kinecta Federal Credit Union Cosmos', 'rtp', 302), ('Addition Financial Credit Union Cosmos', 'rtp', 227), ('Northwest Federal Credit Union Cosmos', 'rtp', 70), ('Royal Credit Union Cosmos', 'rtp', 342), ('Silver State Schools Credit Union Cosmos', 'rtp', 80), ('Navy Federal Credit Union Cosmos', 'rtp', 40782), ('Veridian Credit Union', 'rtp', 1078), ('Consumers CU Cosmos', 'rtp', 293), ('First Internet Bank Cosmos', 'rtp', 20), ('Avidia Bank Cosmos', 'rtp', 87)][0m
Answer:[32;1m[1;3mThe query groups the payment records by client name and payment method, and counts

# List Instructions matching payment method and name

In [15]:
# Enter the query
## Few queries to try out -
#athena - Payments - payment dataset
query = """Provide the instruction id of payments with payment method rtp and sender name Alex?"""

#Response from Langchain
response =  run_query(query)
print("----------------------------------------------------------------------")
print(f'SQL and response from user query {query}  \n  {response["result"]}')

SET database to athena
Step complete. Channel is:  db


[1m> Entering new SQLDatabaseChain chain...[0m
Provide the instruction id of payments with payment method rtp and sender name Alex?
SQLQuery:[32;1m[1;3mSELECT instructionid
FROM payment_dataset
WHERE paymentmethod = 'rtp' AND sendername = 'Alex';[0m
SQLResult: [33;1m[1;3m[('20240301021000021P1BRJPM00500026717',), ('20240301021000021P1BRJPM00520012144',)][0m
Answer:[32;1m[1;3mThe SQL query filters the `payment_dataset` table to find rows where the `paymentmethod` is 'rtp' and the `sendername` is 'Alex'. The `instructionid` values for those rows are returned.

Based on the result, the instruction IDs of payments with payment method 'rtp' and sender name 'Alex' are '20240301021000021P1BRJPM00500026717' and '20240301021000021P1BRJPM00520012144'.[0m
[1m> Finished chain.[0m
----------------------------------------------------------------------
SQL and response from user query Provide the instruction id of payments with paym

# Counts By Channel

In [16]:
# Enter the query
## Few queries to try out -
#athena - Payments - payment dataset
countbychannel = """Provide the counts of payments group by channel?"""
query = countbychannel

#Response from Langchain
response =  run_query(query)
print("----------------------------------------------------------------------")
print(f'SQL and response from user query {query}  \n  {response["result"]}')

SET database to athena
Step complete. Channel is:  db


[1m> Entering new SQLDatabaseChain chain...[0m
Provide the counts of payments group by channel?
SQLQuery:[32;1m[1;3mSELECT channel, COUNT(*) AS payment_count
FROM payment_dataset
GROUP BY channel;[0m
SQLResult: [33;1m[1;3m[('CSM', 43462)][0m
Answer:[32;1m[1;3mThe query groups the payments by the 'channel' column and counts the number of payments for each channel. The result shows that there is one channel 'CSM' with 43,462 payments.[0m
[1m> Finished chain.[0m
----------------------------------------------------------------------
SQL and response from user query Provide the counts of payments group by channel?  
  The query groups the payments by the 'channel' column and counts the number of payments for each channel. The result shows that there is one channel 'CSM' with 43,462 payments.


# Counts By Client, Product and Channel

In [17]:
# Enter the query
## Few queries to try out -
#athena - Payments - payment dataset
countbyclientproductchannel = """Provide the counts of payments group by client, product and channel?"""
query = countbyclientproductchannel

#Response from Langchain
response =  run_query(query)
print("----------------------------------------------------------------------")
print(f'SQL and response from user query {query}  \n  {response["result"]}')

SET database to athena
Step complete. Channel is:  db


[1m> Entering new SQLDatabaseChain chain...[0m
Provide the counts of payments group by client, product and channel?
SQLQuery:[32;1m[1;3mSELECT clientname, product, channel, COUNT(*) AS payment_count
FROM payment_dataset
GROUP BY clientname, product, channel;[0m
SQLResult: [33;1m[1;3m[('Navy Federal Credit Union Cosmos', 'cosmos', 'CSM', 40782), ('Veridian Credit Union', 'cosmos', 'CSM', 1079), ('Avidia Bank Cosmos', 'cosmos', 'CSM', 87), ('Consumers CU Cosmos', 'cosmos', 'CSM', 293), ('Northwest Federal Credit Union Cosmos', 'cosmos', 'CSM', 70), ('Kinecta Federal Credit Union Cosmos', 'cosmos', 'CSM', 303), ('First Internet Bank Cosmos', 'cosmos', 'CSM', 20), ('Addition Financial Credit Union Cosmos', 'cosmos', 'CSM', 227), ('Royal Credit Union Cosmos', 'cosmos', 'CSM', 347), ('Apple Federal Credit Union Cosmos', 'cosmos', 'CSM', 174), ('Silver State Schools Credit Union Cosmos', 'cosmos', 'CSM', 80)][0m
Answer:[32;1m[1

# List Instruction with maximum amount

In [20]:
# Enter the query
## Few queries to try out -
#athena - Payments - payment dataset
listidbymaxamount = """Provide the instruction id and amount of payment that has maximum instructed amount?"""
query = listidbymaxamount

#Response from Langchain
response =  run_query(query)
print("----------------------------------------------------------------------")
print(f'SQL and response from user query {query}  \n  {response["result"]}')

SET database to athena
Step complete. Channel is:  db


[1m> Entering new SQLDatabaseChain chain...[0m
Provide the instruction id and amount of payment that has maximum instructed amount?
SQLQuery:[32;1m[1;3mSELECT instructionid, instructedamount
FROM payment_dataset
ORDER BY instructedamount DESC
LIMIT 1;[0m
SQLResult: [33;1m[1;3m[('20240301053000196P1BGPO710003387961', 117518.22)][0m
Answer:[32;1m[1;3mThe instruction ID with the maximum instructed amount is '20240301053000196P1BGPO710003387961' and the corresponding instructed amount is 117518.22.[0m
[1m> Finished chain.[0m
----------------------------------------------------------------------
SQL and response from user query Provide the instruction id and amount of payment that has maximum instructed amount?  
  The instruction ID with the maximum instructed amount is '20240301053000196P1BGPO710003387961' and the corresponding instructed amount is 117518.22.


# List Few Sample Instructions

In [21]:
# Enter the query
## Few queries to try out -
#athena - Payments - payment dataset
listdatasample = """Provide the few payment instruction id and it's amount, channel and product?"""
query = listdatasample

#Response from Langchain
response =  run_query(query)
print("----------------------------------------------------------------------")
print(f'SQL and response from user query {query}  \n  {response["result"]}')

SET database to athena
Step complete. Channel is:  db


[1m> Entering new SQLDatabaseChain chain...[0m
Provide the few payment instruction id and it's amount, channel and product?
SQLQuery:[32;1m[1;3mSELECT instructionid, instructedamount, channel, product
FROM payment_dataset
LIMIT 5;[0m
SQLResult: [33;1m[1;3m[('20240229021000021P1BRJPM00520074136', 200.0, 'CSM', 'cosmos'), ('20240229121000248P1BZWFA28669673104', 70.46, 'CSM', 'cosmos'), ('20240229043000096P1BAAAA00093771541', 17.01, 'CSM', 'cosmos'), ('20240229101115315P1BCLED00000114414', 50.0, 'CSM', 'cosmos'), ('20240229121000248P1BZWFA28669673115', 0.01, 'CSM', 'cosmos')][0m
Answer:[32;1m[1;3mThe query provided the first 5 rows from the payment_dataset table, displaying the instructionid, instructedamount, channel, and product columns. Here are a few payment instruction IDs and their corresponding amounts, channels, and products:

1. Instruction ID: 20240229021000021P1BRJPM00520074136, Amount: $200.0, Channel: CSM, Prod

# Environment Configuration

In [23]:
!python --version
# Python 3.11.7


Python 3.11.7


In [24]:
!pip list

Package                           Version
--------------------------------- ------------
aiobotocore                       2.7.0
aiohttp                           3.9.3
aioitertools                      0.7.1
aiosignal                         1.2.0
alabaster                         0.7.12
altair                            5.0.1
anaconda-anon-usage               0.4.3
anaconda-catalogs                 0.2.0
anaconda-client                   1.12.3
anaconda-cloud-auth               0.1.4
anaconda-navigator                2.5.2
anaconda-project                  0.11.1
anthropic                         0.21.3
anyio                             4.2.0
appdirs                           1.4.4
archspec                          0.2.1
argon2-cffi                       21.3.0
argon2-cffi-bindings              21.2.0
arrow                             1.2.3
asn1crypto                        1.5.1
astroid                           2.14.2
astropy                           5.3.4
asttokens               