In [3]:
# !pip install --upgrade pandas pandasai

In [4]:
# !pip install langchain

In [6]:
# !pip install python-dotenv

<hr>

<b style = 'font-size:28px;font-family:Arial;color:#1b8fb8'>1. Import libraries</b>

In [1]:
from dotenv import load_dotenv

# load keys
load_dotenv()

import json
import boto3


import sqlalchemy
from sqlalchemy import create_engine
# from snowflake.sqlalchemy import URL

from langchain.docstore.document import Document
from langchain import PromptTemplate,SagemakerEndpoint,SQLDatabase, SQLDatabaseChain, LLMChain
from langchain.llms.sagemaker_endpoint import LLMContentHandler
from langchain.chains.question_answering import load_qa_chain
from langchain.prompts.prompt import PromptTemplate
from langchain.chains import SQLDatabaseSequentialChain

from langchain.chains.api.prompt import API_RESPONSE_PROMPT
from langchain.chains import APIChain
from langchain.prompts.prompt import PromptTemplate
from langchain.chat_models import ChatAnthropic
from langchain.chains.api import open_meteo_docs

from pandasai import PandasAI
from pandasai.llm.starcoder import Starcoder
from pandasai.llm.open_assistant import OpenAssistant
from pandasai.llm.falcon import Falcon

from typing import Dict
import time

<hr>

<b style = 'font-size:28px;font-family:Arial;color:#1b8fb8'>2. IAM - create role and policies</b>

In [2]:
glue_databucket_name = 'demo-data-1706'
glue_db_name = 'flight_db4'
glue_role=  'AWSGlueServiceRole-123-456'
glue_crawler_name = 'crawler-2006_v3'

In [3]:
import boto3
import os
# Retrieve the AWS account number
sts_client = boto3.client('sts')
account_number = sts_client.get_caller_identity().get('Account')
# Retrieve the AWS region
#region = os.environ['AWS_REGION']
region = boto3.session.Session().region_name
print("AWS Account Number:", account_number)
print("AWS Region:", region)
trust_policy="""{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Service": "glue.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}"""
managed_policy="""{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "glue:*"
            ],
            "Resource": [
                "arn:aws:glue:"""+region+""":"""+account_number+""":catalog",
                "arn:aws:glue:"""+region+""":"""+account_number+""":database/*",
                "arn:aws:glue:"""+region+""":"""+account_number+""":table/*"
            ],
            "Effect": "Allow",
            "Sid": "Readcrawlerresources"
        },
        {
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:glue:"""+region+""":"""+account_number+""":log-group:/aws-glue/crawlers*",
                "arn:aws:logs:*:*:/aws-glue/*",
                "arn:aws:logs:*:*:/customlogs/*"
            ],
            "Effect": "Allow",
            "Sid": "ReadlogResources"
        },
    {
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:PutBucketLogging",
                "s3:ListBucket",
                "s3:PutBucketVersioning"
            ],
            "Resource": [
                "arn:aws:s3:::"""+glue_databucket_name+"""",
                "arn:aws:s3:::"""+glue_databucket_name+"""/*"
            ],
            "Effect": "Allow",
            "Sid": "ReadS3Resources"
        }
    ]
    }"""
print(managed_policy, file=open('managed-policy.json', 'w'))
print(trust_policy, file=open('trust-policy.json', 'w'))

AWS Account Number: 23810181234567890
AWS Region: us-east-1


<h3>create glue servie role policy</h3>

In [4]:
%%sh -s "$glue_role" 
echo $1 
glue_role="$1"
managed_policy_name="managed-policy-$glue_role"
echo $managed_policy_name
aws iam create-role --role-name $glue_role --assume-role-policy-document file://trust-policy.json
output=$(aws iam create-policy --policy-document file://managed-policy.json --policy-name $managed_policy_name)
arn=$(echo "$output" | grep -oP '"Arn": "\K[^"]+')
echo "$arn"
aws iam attach-role-policy --policy-arn $arn --role-name $glue_role

AWSGlueServiceRole-123-456
managed-policy-AWSGlueServiceRole-123-456
{
    "Role": {
        "Path": "/",
        "RoleName": "AWSGlueServiceRole-123-456",
        "RoleId": "AROATO37ZHZHPKZ5TG2AP",
        "Arn": "arn:aws:iam::238101806670:role/AWSGlueServiceRole-123-456",
        "CreateDate": "2023-06-23T06:53:01Z",
        "AssumeRolePolicyDocument": {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Sid": "",
                    "Effect": "Allow",
                    "Principal": {
                        "Service": "glue.amazonaws.com"
                    },
                    "Action": "sts:AssumeRole"
                }
            ]
        }
    }
}
arn:aws:iam::238101806670:policy/managed-policy-AWSGlueServiceRole-123-456


<hr>

<b style = 'font-size:28px;font-family:Arial;color:#1b8fb8'>3. Glue</b>

<h3>Create database</h3>

In [15]:
# Create database 
import boto3
client = boto3.client('glue')

try:
    response = client.create_database(
        DatabaseInput={
            'Name': glue_db_name,
            'Description': 'This database is created using Python boto3',
        }
    )
    print(f"Successfully created database: {glue_db_name}")
except Exception as e:
    print(f"error in creating database. Check if the database: {glue_db_name} already exists")
    print(e)

#introducing some lag for the iam role to create
time.sleep(20) 

Successfully created database: flight_db4


<h3>create glue crawler</h3>

In [16]:
import boto3

client = boto3.client('glue')

# Create Glue Crawler
try:
    response = client.create_crawler(
        Name=glue_crawler_name,
        Role=glue_role,
        DatabaseName=glue_db_name,
        Targets={
            'S3Targets': [
                {
                    'Path': 's3://{BUCKET_NAME}/flight/'.format(BUCKET_NAME =glue_databucket_name)
                }
            ]
        },
        TablePrefix=''
    )
    print("Successfully created crawler")
except:
    print("error in creating crawler. However, if the crawler already exists, the crawler will run.")

Successfully created crawler


<h3>Run the Crawler</h3>

In [17]:
# Run the Crawler
try:
    response = client.start_crawler(Name=glue_crawler_name )
    print("Successfully started crawler. The crawler may take 2-5 mins to detect the schema.")
    while True:
        # Get the crawler status
        response = client.get_crawler(Name=glue_crawler_name)
         # Extract the crawler state
        status = response['Crawler']['State']
        # Print the crawler status
        print(f"Crawler '{glue_crawler_name}' status: {status}")
        if status == 'READY':  # Replace 'READY' with the desired completed state
            break  # Exit the loop if the desired state is reached

        time.sleep(10)  # Sleep for 10 seconds before checking the status again
    
except Exception as e:
    print("error in starting crawler. Check the logs for the error details.")
    print(e)

Successfully started crawler. The crawler may take 2-5 mins to detect the schema.
Crawler 'crawler-2006_v3' status: RUNNING
Crawler 'crawler-2006_v3' status: RUNNING
Crawler 'crawler-2006_v3' status: RUNNING
Crawler 'crawler-2006_v3' status: RUNNING
Crawler 'crawler-2006_v3' status: RUNNING
Crawler 'crawler-2006_v3' status: STOPPING
Crawler 'crawler-2006_v3' status: STOPPING
Crawler 'crawler-2006_v3' status: STOPPING
Crawler 'crawler-2006_v3' status: STOPPING
Crawler 'crawler-2006_v3' status: STOPPING
Crawler 'crawler-2006_v3' status: STOPPING
Crawler 'crawler-2006_v3' status: STOPPING
Crawler 'crawler-2006_v3' status: STOPPING
Crawler 'crawler-2006_v3' status: STOPPING
Crawler 'crawler-2006_v3' status: STOPPING
Crawler 'crawler-2006_v3' status: READY


<hr>

<b style = 'font-size:28px;font-family:Arial;color:#1b8fb8'>4. Step 1 - Connect to databases using SQL Alchemy</b>

In [3]:
#define connections
client = boto3.client('secretsmanager')
region=client.meta.region_name

In [4]:
#S3
# connect to s3 using athena
## athena variables
connathena=f"athena.{region}.amazonaws.com" 
portathena='443' #Update, if port is different
schemaathena=glue_db_name #from user defined params
s3stagingathena=f's3://{glue_databucket_name}/athenaresults/'#from cfn params
wkgrpathena='primary'#Update, if workgroup is different

# tablesathena=['dataset']#[<tabe name>]
##  Create the athena connection string
connection_string = f"awsathena+rest://@{connathena}:{portathena}/{schemaathena}?s3_staging_dir={s3stagingathena}/&work_group={wkgrpathena}"
##  Create the athena  SQLAlchemy engine
engine_athena = create_engine(connection_string, echo=False)

dbathena = SQLDatabase(engine_athena)
gdc = [schemaathena]

In [5]:
gdc

['flight_db4']

<hr>

<b style = 'font-size:28px;font-family:Arial;color:#1b8fb8'>5. Step 2 - Generate Dynamic Prompt Templates</b>

In [6]:
# Generate Dynamic prompts to populate the Glue Data Catalog
# harvest aws crawler metadata

def parse_catalog():
    # Connect to Glue catalog
    columns_str = ""

    # define glue cient
    glue_client = boto3.client("glue")

    for db in gdc:
        response = glue_client.get_tables(DatabaseName=db)
        for tables in response["TableList"]:
            # classification in the response for s3 and other databases is different. Set classification based on the response location
            if tables["StorageDescriptor"]["Location"].startswith("s3"):
                classification = "s3"
            else:
                classification = tables["Parameters"]["classification"]
            for columns in tables["StorageDescriptor"]["Columns"]:
                dbname, tblname, colname = (
                    tables["DatabaseName"],
                    tables["Name"],
                    columns["Name"],
                )
                columns_str = (
                    columns_str + f"\n{classification}|{dbname}|{tblname}|{colname}"
                )
    # API
    ## Append the metadata of the API to the unified glue data catalog
    columns_str = columns_str + "\n" + ("api|meteo|weather|weather")
    return columns_str


glue_catalog = parse_catalog()

# display a few lines from the catalog
print("\n".join(glue_catalog.splitlines()[-10:]))


s3|flight_db4|flight|source_city
s3|flight_db4|flight|departure_time
s3|flight_db4|flight|stops
s3|flight_db4|flight|arrival_time
s3|flight_db4|flight|destination_city
s3|flight_db4|flight|class
s3|flight_db4|flight|duration
s3|flight_db4|flight|days_left
s3|flight_db4|flight|price
api|meteo|weather|weather


<h3>Define LLM</h3>

In [7]:
#LLM 
#get the llm api key

from langchain.llms import OpenAI

# LLM -Falcon
# llm = Falcon("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")

# os.environ["OPENAI_API_KEY"] = "ABCD-12345"
llm = OpenAI(temperature=0.9)

<hr>

<b style = 'font-size:28px;font-family:Arial;color:#1b8fb8'>6. Step 3 - Define Functions to determine</b>

<ol style = 'font-size:16px;font-family:Arial'> <li>The best data channel to answer the user query, </li> <li> Generate response to user query</li></ol>

In [8]:
# Function 1 'Infer Channel'
# define a function that infers the channel/database/table and sets the database for querying

def identify_channel(query):
    # Prompt 1 'Infer Channel'
    ##set prompt template. It instructs the llm on how to evaluate and respond to the llm. It is referred to as dynamic since glue data catalog is first getting generated and appended to the prompt.
    prompt_template = (
        """
     From the table below, find the database (in column database) which will contain the data (in corresponding column_names) to answer the question 
     {query} \n
     """
        + glue_catalog
        + """ 
     Give your answer as database == 
     Also,give your answer as database.table == 
     """
    )
    ##define prompt 1
    PROMPT_channel = PromptTemplate(template=prompt_template, input_variables=["query"])

    # define llm chain
    llm_chain = LLMChain(prompt=PROMPT_channel, llm=llm)

    generated_texts = llm_chain.run(query)
    print(generated_texts)

    # set the channel from where the query can be answered
    if "s3" in generated_texts:
        channel = "db"
        db = dbathena
        print("SET database to athena")
    elif "api" in generated_texts:
        channel = "api"
        print("SET database to weather api")
    else:
        raise Exception(
            "User question cannot be answered by any of the channels mentioned in the catalog"
        )
    print("Step complete. Channel is: ", channel)

    return channel, db

In [9]:
# Function 2 'Run Query'
# define a function that infers the channel/database/table and sets the database for querying
def run_query(query):

    channel, db = identify_channel(query)  # call the identify channel function first

    ##Prompt 2 'Run Query'
    # after determining the data channel, run the Langchain SQL Database chain to convert 'text to sql' and run the query against the source data channel.
    # provide rules for running the SQL queries in default template--> table info.

    _DEFAULT_TEMPLATE = """Given an input question, first create a syntactically correct {dialect} query to run, then look at the results of the query and return the answer.

    Do not append 'Query:' to SQLQuery.

    Display SQLResult after the query is run in plain english that users can understand. 

    Provide answer in simple english statement.

    Only use the following tables:

    {table_info}
    If someone asks for the sales, they really mean the tickit.sales table.
    If someone asks for the sales date, they really mean the column tickit.sales.saletime.

    Question: {input}"""

    PROMPT_sql = PromptTemplate(
        input_variables=["input", "table_info", "dialect"], template=_DEFAULT_TEMPLATE
    )

    if channel == "db":
        db_chain = SQLDatabaseChain.from_llm(
            llm, db, prompt=PROMPT_sql, verbose=True, return_intermediate_steps=False
        )
        response = db_chain.run(query)
    elif channel == "api":
        chain_api = APIChain.from_llm_and_api_docs(
            llm, open_meteo_docs.OPEN_METEO_DOCS, verbose=True
        )
        response = chain_api.run(query)
    else:
        raise Exception("Unlisted channel. Check your unified catalog")
    return response

<hr>

<b style = 'font-size:28px;font-family:Arial;color:#1b8fb8'>7. Step 4 - Run the run_query function</b>

<p style = 'font-size:16px;font-family:Arial'>Run the run_query function that in turn calls the Langchain SQL Database chain to convert 'text to sql' and runs the query against the source data channel</p>

In [11]:
# Enter the query
## Few queries to try out - 
#athena - Flight data

query = """What it the mean price of Flight SpiceJet?""" 

#Response from Langchain
response =  run_query(query)
print("----------------------------------------------------------------------")
print(f'SQL and response from user query {query}  \n  {response}')


Database: s3|flight_db4
Database.table: s3|flight_db4.flight
SET database to athena
Step complete. Channel is:  db


[1m> Entering new SQLDatabaseChain chain...[0m
What it the mean price of Flight SpiceJet?
SQLQuery:[32;1m[1;3mSELECT AVG(price) FROM flight WHERE airline = 'SpiceJet';[0m
SQLResult: [33;1m[1;3m[(6179.278881367218,)][0m
Answer:[32;1m[1;3mThe mean price of Flight SpiceJet is 6179.[0m
[1m> Finished chain.[0m
----------------------------------------------------------------------
SQL and response from user query What it the mean price of Flight SpiceJet?  
  The mean price of Flight SpiceJet is 6179.


In [12]:
# Retrieve the list of existing buckets
s3 = boto3.client('s3')
response = s3.list_buckets()

# Output the bucket names
print('Existing buckets:')
for bucket in response['Buckets']:
    print(f'  {bucket["Name"]}')

Existing buckets:
  demo-data-1706
  sagemaker-studio-238101806670-eyhmv28jpbq
  sagemaker-us-east-1-238101806670


In [19]:
import boto3

s3 = boto3.client('s3')
s3.download_file('demo-data-1706', 'flight/flight_price.csv', 'flight_price.csv')

In [21]:
import pandas as pd
df = pd.read_csv('flight_price.csv')

In [22]:
df.head()

Unnamed: 0,airline,flight,source_city,departure_time,stops,arrival_time,destination_city,class,duration,days_left,price
0,SpiceJet,SG-8709,Delhi,Evening,zero,Night,Mumbai,Economy,2.17,1,5953
1,SpiceJet,SG-8157,Delhi,Early_Morning,zero,Morning,Mumbai,Economy,2.33,1,5953
2,AirAsia,I5-764,Delhi,Early_Morning,zero,Early_Morning,Mumbai,Economy,2.17,1,5956
3,Vistara,UK-995,Delhi,Morning,zero,Afternoon,Mumbai,Economy,2.25,1,5955
4,Vistara,UK-963,Delhi,Morning,zero,Morning,Mumbai,Economy,2.33,1,5955


In [23]:
df.loc[df.airline == 'SpiceJet'].price.mean()

6179.278881367218

In [25]:
# Enter the query
# Few queries to try out -

query = "What is the mean price of flight SG-8709?"

# Response from Langchain
response = run_query(query)
print("----------------------------------------------------------------------")
print(f"SQL and response from user query {query}  \n  {response}")



Database: s3|flight_db2
Database.table: s3|flight_db2|flight
SET database to athena
Step complete. Channel is:  db


[1m> Entering new  chain...[0m
What is the mean price of flight SG-8709?
SQLQuery:[32;1m[1;3mSELECT AVG(price) FROM flight WHERE flight = 'SG-8709';[0m
SQLResult: [33;1m[1;3m[(4740.022556390977,)][0m
Answer:[32;1m[1;3mThe mean price of flight SG-8709 is 4740.02.[0m
[1m> Finished chain.[0m
----------------------------------------------------------------------
SQL and response from user query What is the mean price of flight SG-8709?  
  The mean price of flight SG-8709 is 4740.02.


<b style = 'font-size:28px;font-family:Arial;color:#1b8fb8'>8. Clean up</b>

<p style = 'font-size:16px;font-family:Arial'>After you run the modern data architecture with Generative AI, make sure to clean up any resources that won’t be utilized. Delete the data in Amazon S3 and make sure to stop any SageMaker Studio notebook instances to not incur any further charges.</p>

<hr>
<center><b style = 'font-size:28px;font-family:Arial;color:#873e23'>Thank you</b></center>