# Integrate Data Mesh Architecture with Generative AI and interact using prompts for querying SQL databases & APIs

This notebook demonstrates how large language models, such as Anthropic, interact with AWS databases, data stores, and third-party data warehousing solutions like Snowflake. We showcase this interaction by generating and running SQL queries and making requests to API endpoints. We achieve all of this by using the LangChain framework, which allows the language model to interact with its environment and connect with other sources of data. The LangChain framework operates based on the following principles: calling out to a language model, being data-aware, and being agentic. Our notebook focuses on establishing database connections to various data sources, consolidating metadata, and returning fact-based data points in response to user queries using LLMs and LangChain.


<img src='./images/GenAI+SQL-Prompt Engineering Steps V6.drawio.png' width="800" height="600">


Step 1. Connection to various channels through which LLMs can talk to your data. These channels include:

    - RedShift Serverless - to connect to datastore 'tickit'(ticket is referred as tickit in the sample data store) to retrieve information regarding ticket sales.
    - Aurora - MySQL Serverless - to connect to datastore that hosts information about the employees.
    - S3/Athena - to connect to the SageMaker's offline feature store on claims information. 
    - Snowflake - to connect to stocks related data residing in finance schema of 3rd party software.
    - APIs - to connect to meteo(in this example we use Langchain's sample dataset on meteo) to retrieve weather information.
    
Step 2. Usage of Dynamic generation of prompt templates by populating metadata of the tables using Glue Data Catalog(GDC) as context. GDC was populated by running a crawler on the databases. Refer to the information here to create and run a glue crawler. In case of api, a line item was created in GDC data extract.

Step 3. Apply user query to LLM and Langchain to determine the data channel.

Step 4. After determining the data channel, run the Langchain SQL Database chain to convert 'text to sql' and run the query against the source data channel. 

Finally, display the results.


### Pre-requisites:
1. Use kernel Base Python 3.0
2. Setup Aurora MySQL Serverless database. Load Employee dataset. Use this notebook to load the data into Aurora MySQL.
3. Setup Redshift Serverless. Enable sample data dev for tickit dataset.
4. Setup Snowflake account and populate stocks data. Use this notebook to load the data into Snowflake.
5. Run the Glue Crawler on all the databases mentioned above. 
6. Provide Anthropic API Keys.
7. Pip install the required libraries mentioned in requirements.txt.

In [None]:
pip install -r requirements.txt

In [2]:
import json
import boto3

import sqlalchemy
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL

from langchain.docstore.document import Document
from langchain import PromptTemplate,SagemakerEndpoint,SQLDatabase, SQLDatabaseChain, LLMChain
from langchain.llms.sagemaker_endpoint import LLMContentHandler
from langchain.chains.question_answering import load_qa_chain
from langchain.prompts.prompt import PromptTemplate
from langchain.chains import SQLDatabaseSequentialChain

from langchain.chains.api.prompt import API_RESPONSE_PROMPT
from langchain.chains import APIChain
from langchain.prompts.prompt import PromptTemplate
from langchain.llms import Anthropic
from langchain.chains.api import open_meteo_docs

from typing import Dict

### Step 1 - Connect to databases using SQL Alchemy. 

Under the hood, LangChain uses SQLAlchemy to connect to SQL databases. The SQLDatabaseChain can therefore be used with any SQL dialect supported by SQLAlchemy, 
such as MS SQL, MySQL, MariaDB, PostgreSQL, Oracle SQL, and SQLite. Please refer to the SQLAlchemy documentation for more information about requirements for connecting to your database. 


**Important**: The code below establishes a database connection for data sources and Large Language Models. Please note that the solution will only work if the database connection for your sources is defined in the cell below. Please refer to the Pre-requisites section. If your use case requires data from Aurora MySQL alone, then please comment out other data sources. Furthermore, please update the cluster details and variables for Aurora MySQL accordingly.

In [130]:
#define connections

# collect credentials from Secrets Manager
client = boto3.client('secretsmanager')

#LLM 
#get the llm api key
#llm variables
anthropic_secret_id = "anthropic"
## llm get credentials from secrets manager
response = client.get_secret_value(SecretId=anthropic_secret_id)
secrets_credentials = json.loads(response['SecretString'])
ANTHROPIC_API_KEY = secrets_credentials['ANTHROPIC_API_KEY']
#define large language model here. Make sure to set api keys for the variable ANTHROPIC_API_KEY
llm = Anthropic(temperature=0, anthropic_api_key=ANTHROPIC_API_KEY, max_tokens_to_sample = 512)

#SNOWFLAKE
# connect to snowflake database
## snowflake variables
sf_account_id ="bfb23557.us-east-1"
sf_secret_id = "snowflake-credentials-secret"
dwh = "compute_wh"
db = "finance"
schema = "stockmarket_schema"
table = "all_stocks_5yr"
## snowflake get credentials from secrets manager
response = client.get_secret_value(SecretId=sf_secret_id)
secrets_credentials = json.loads(response['SecretString'])
sf_password = secrets_credentials['password']
sf_username = secrets_credentials['username']
##  Create the snowflake connection string
connection_string = f"snowflake://{sf_username}:{sf_password}@{sf_account_id}/{db}/{schema}?warehouse={dwh}"
##  Create the snowflake  SQLAlchemy engine
engine_snowflake = create_engine(connection_string, echo=False)
dbsnowflake = SQLDatabase(engine_snowflake)

#S3
# connect to s3 using athena
## athena variables
connathena='athena.us-east-1.amazonaws.com'
portathena='443'
schemaathena='sagemaker_featurestore'
s3stagingathena='s3://researchanddevelopmentnt'
wkgrpathena='primary'
tablesathena=['claims']
##  Create the athena connection string
connection_string = f"awsathena+rest://@{connathena}:{portathena}/{schemaathena}?s3_staging_dir={s3stagingathena}/&work_group={wkgrpathena}"
##  Create the athena  SQLAlchemy engine
engine_athena = create_engine(connection_string, echo=False)
dbathena = SQLDatabase(engine_athena, include_tables=tablesathena)

#AURORA MYSQL
##connect to aurora mysql
##aurora mysql cluster details/variables
cluster_arn = 'arn:aws:rds:us-east-1:673015835385:cluster:database-3'
secret_arn = 'arn:aws:secretsmanager:us-east-1:673015835385:secret:rds-db-credentials/cluster-JKRI452JBDML53IJRZTFC7DBTU/admin/1683665443623-aAR0hN'
rdsdb='employees'
rdsdb_tbl = ['employees']
##  Create the aurora connection string
connection_string = f"mysql+auroradataapi://:@/{rdsdb}"
##  Create the aurora  SQLAlchemy engine
engine_rds = create_engine(connection_string, echo=False,connect_args=dict(aurora_cluster_arn=cluster_arn, secret_arn=secret_arn))
dbrds = SQLDatabase(engine_rds, include_tables=rdsdb_tbl)

#REDSHIFT
# connect to redshift database
## redshift variables
rs_secret_id = "redshift-serverless"
rs_endpoint='sagemaker.673015835385.us-east-1.redshift-serverless.amazonaws.com'
rs_port='5439'
rs_db='sample_data_dev'
rs_schema='tickit'
## redshift get credentials from secrets manager
response = client.get_secret_value(SecretId=rs_secret_id)
secrets_credentials = json.loads(response['SecretString'])
rs_password = secrets_credentials['password']
rs_username = secrets_credentials['username']
##  Create the redshift connection string
# connection_string = f"redshift+redshift_connector://{rs_username}:{rs_password}@{rs_endpoint}:{rs_port}/{rs_db}"
# engine_redshift = create_engine(connection_string, echo=False)
# dbredshift = SQLDatabase(engine_redshift)

#Glue Data Catalog
##Provide list of all the databases where the table metadata resides after the glue successfully crawls the table
gdc = ['redshift-sagemaker-sample-data-dev', 'snowflake','rds-aurora-mysql-employees','sagemaker_featurestore']

### Step 2 - Generate Dynamic Prompt Templates
Build a consolidated view of Glue Data Catalog by combining metadata stored for all the databases in pipe delimited format.

In [143]:
#Generate Dynamic prompts to populate the Glue Data Catalog
#harvest aws crawler metadata

def parse_catalog():
    #Connect to Glue catalog
    #get metadata of redshift serverless tables
    columns_str=''

    for db in gdc:
        response = glue_client.get_tables(DatabaseName =db)
        for tables in response['TableList']:
            #classification in the response for s3 and other databases is different. Set classification based on the response location
            if tables['StorageDescriptor']['Location'].startswith('s3'):  classification='s3' 
            else:  classification = tables['Parameters']['classification']
            for columns in tables['StorageDescriptor']['Columns']:
                    dbname,tblname,colname=tables['DatabaseName'],tables['Name'],columns['Name']
                    columns_str=columns_str+f'\n{classification}|{dbname}|{tblname}|{colname}'                     
    #API
    ## Append the metadata of the API to the unified glue data catalog
    columns_str=columns_str+'\n'+('api|meteo|weather|weather')
    return columns_str

glue_catalog = parse_catalog()
print(glue_catalog)


redshift|redshift-sagemaker-sample-data-dev|sample_data_dev_tickit_sales|listid
redshift|redshift-sagemaker-sample-data-dev|sample_data_dev_tickit_sales|saletime
redshift|redshift-sagemaker-sample-data-dev|sample_data_dev_tickit_sales|eventid
redshift|redshift-sagemaker-sample-data-dev|sample_data_dev_tickit_sales|salesid
redshift|redshift-sagemaker-sample-data-dev|sample_data_dev_tickit_sales|sellerid
redshift|redshift-sagemaker-sample-data-dev|sample_data_dev_tickit_sales|dateid
redshift|redshift-sagemaker-sample-data-dev|sample_data_dev_tickit_sales|commission
redshift|redshift-sagemaker-sample-data-dev|sample_data_dev_tickit_sales|qtysold
redshift|redshift-sagemaker-sample-data-dev|sample_data_dev_tickit_sales|buyerid
redshift|redshift-sagemaker-sample-data-dev|sample_data_dev_tickit_sales|pricepaid
snowflake|snowflake|finance_stockmarket_schema_all_stocks_5yr|date
snowflake|snowflake|finance_stockmarket_schema_all_stocks_5yr|high
snowflake|snowflake|finance_stockmarket_schema_all

### Step 3 - Enter User Query and determine the best data channel to answer the user query.

Some samples are provided below for test runs. Uncomment the query to run.

In [167]:
# Enter the query
#snowflake - Finance and Investments
query = """Which stock performed the best and the worst in May of 2013?"""
# query = """What is the average volume stocks traded  in July of 2013?"""

#rds - Human Resources
# query = """Name all employees with birth date this month""" 
# query = """Combien d'employés sont des femmes? """ #Ask question in French - How  many females are there?
# query = """How many employees were hired before 1990?"""  

#athena - Legal - SageMaker offline featurestore
# query = """How many frauds happened in the year 2023 ?"""  
# query = """How many policies were claimed this year ?""" 

#redshift - Sales & Marketing
# query = """How many tickit sales are there""" 
# query = "what was the total commision for the tickit sales in the year 2008?" 

#api - product - weather
# query = """What is the weather like right now in New York City in degrees Farenheit?"""

In [168]:
#set prompt template. It instructs the llm on how to evaluate and respond to the llm. It is referred to as dynamic since glue data catalog is first getting generated and appended to the prompt.
prompt_template = """
     From the table below, find the database (in column database) which will contain the data (in corresponding column_names) to answer the question 
     {query} \n
     """+glue_catalog +""" 
Give your answer as database == 
Also,give your answer as database.table == 
"""

PROMPT = PromptTemplate(
      template=prompt_template, input_variables=["query"]
  )

# define llm chain
llm_chain = LLMChain(prompt=PROMPT, llm=llm)

#run the query and save to generated texts
generated_texts = llm_chain.run(query)

print(generated_texts)

#set the best channel from where the query can be answered
if 'snowflake' in generated_texts: 
        channel='db'
        db=dbsnowflake 
        print("SET database to snowflake")  
elif 'redshift'  in generated_texts: 
        channel='db'
        db=dbredshift
        print("SET database to redshift")
elif 's3' in generated_texts: 
        channel='db'
        db=dbathena
        print("SET database to athena")
elif 'rdsmysql' in generated_texts: 
        channel='db'
        db=dbrds
        print("SET database to rds")    
elif 'api' in generated_texts: 
        channel='api'
        print("SET database to weather api")        
else: raise Exception("User question cannot be answered by any of the channels mentioned in the catalog")
print("Step complete. Channel is: ", channel)


database == snowflake
database.table == snowflake.finance_stockmarket_schema_all_stocks_5yr
SET database to snowflake
Step complete. Channel is:  db


### Step 4 - Run the Langchain SQL Database chain to convert 'text to sql' and run the query against the source data channel

In [169]:
#after determining the data channel, run the Langchain SQL Database chain to convert 'text to sql' and run the query against the source data channel. 
#provide rules for running the SQL queries in default template--> table info.

_DEFAULT_TEMPLATE = """Given an input question, first create a syntactically correct {dialect} query to run, then look at the results of the query and return the answer.

Remove 'Query:' from SQLQuery. 
Display SQLResult after the query is run in plain english that users can understand. 

Only use the following tables:

{table_info}
If someone asks for the sales, they really mean the tickit.sales table.
If someone asks for the sales date, they really mean the column tickit.sales.saletime.

Question: {input}"""

PROMPT = PromptTemplate(
    input_variables=["input", "table_info", "dialect"], template=_DEFAULT_TEMPLATE
)
# print(PROMPT)

if channel=='db':
    db_chain = SQLDatabaseChain.from_llm(llm, db, prompt=PROMPT, verbose=True, return_intermediate_steps=False)
    response=db_chain.run(query)
elif channel=='api':
    chain_api = APIChain.from_llm_and_api_docs(llm, open_meteo_docs.OPEN_METEO_DOCS, verbose=True)
    response=chain_api.run(query)
else: raise Exception("Unlisted channel. Check your unified catalog")



[1m> Entering new SQLDatabaseChain chain...[0m
Which stock performed the best and the worst in May of 2013?
SQLQuery:[32;1m[1;3m
SELECT name, MAX(close) AS max_close, MIN(close) AS min_close 
FROM all_stocks_5yr
WHERE date BETWEEN '2013-05-01' AND '2013-05-31'
GROUP BY name
ORDER BY max_close DESC, min_close ASC
[0m
SQLResult: [33;1m[1;3m[('PCLN', 842.5, 694.06), ('GOOGL', 458.4029, 410.6252), ('AZO', 427.84, 407.58), ('CMG', 377.43, 361.0), ('BLK', 291.69, 260.94), ('REGN', 282.8, 237.29), ('AMZN', 269.9, 248.23), ('GWW', 267.34, 240.9), ('BIIB', 240.86, 209.28), ('EQIX', 229.67, 202.64), ('MTD', 228.0, 202.6), ('IBM', 209.36, 199.63), ('SHW', 193.56, 181.1), ('RL', 189.56, 175.09), ('ADS', 183.03, 170.44), ('SPG', 180.34, 166.44), ('ESS', 170.47, 154.77), ('ISRG', 168.5998, 156.7665), ('PSA', 167.72, 151.8), ('AMG', 167.3, 150.72), ('GS', 164.35, 142.61), ('TDG', 154.69, 146.1), ('PXD', 145.24, 118.97), ('WYNN', 143.11, 135.89), ('AVB', 141.46, 129.79), ('RE', 135.97, 126.55

In [151]:
print(response)


SELECT name, MAX(close) AS max_close, MIN(close) AS min_close 
FROM all_stocks_5yr
WHERE date BETWEEN '2013-05-01' AND '2013-05-31'
GROUP BY name
ORDER BY max_close DESC, min_close ASC

The stocks with the highest and lowest closing prices in May 2013 were Priceline (PCLN) at $842.50 and Advanced Micro Devices (AMD) at $3.22, respectively.


### Clean-up
After you run the data mesh architecture with Generative AI, make sure to clean up any resources that won’t be utilized. Shutdown and delete the databases used (Amazon Redshift, Amazon RDS, Snowflake). In addition, delete the data in Amazon S3 and make sure to stop any SageMaker Studio notebook instances to not incur any further charges. If you used SageMaker Jumpstart to deploy large language model as SageMaker Real-time Endpoint, delete endpoint either through SageMaker console, or through Studio. 
