# Amazon Kendra Custom Connector

### Create two custom connectors
#### First connector will index AWS Glue Jobs
#### Second connector will index PostGres tables

In [10]:
# 1. Create a python environment

# !conda create -y --name kendra-custom-connector python=3.11.8
# !conda init && activate kendra-custom-connector
# !conda install -n kendra-custom-connector ipykernel --update-deps --force-reinstall -y
# !conda install -c conda-forge ipython-sql

## OR
# !python3 -m venv venv
# !source venv/bin/activate  # On Windows, use `venv\Scripts\activate`

# install ipykernel, which consists of IPython as well
# !pip install ipykernel
# create a kernel that can be used to run notebook commands inside the virtual environment
# !python3 -m ipykernel install --user --name=venv

In [11]:
# 2. Install dependencies

# !pip install -r requirements.txt

In [47]:
# 3. Import necessary libraries and load environment variables

from dotenv import load_dotenv, find_dotenv, set_key
import dotenv
import os

# loading environment variables that are stored in local file
local_env_filename = 'dev.env'
load_dotenv(find_dotenv(local_env_filename),override=True)

os.environ['REGION'] = os.getenv('REGION')
os.environ['KENDRA_INDEX'] = os.getenv('KENDRA_INDEX')
os.environ['KENDRA_ROLE'] = os.getenv('KENDRA_ROLE')
os.environ['CUSTOM_DATA_SOURCE_ID_1'] = os.getenv('CUSTOM_DATA_SOURCE_ID_1')
os.environ['CUSTOM_DATA_SOURCE_ID_2'] = os.getenv('CUSTOM_DATA_SOURCE_ID_2')
os.environ['AMAZON_Q_APP_ID'] = os.getenv('AMAZON_Q_APP_ID')
os.environ['Q_CUSTOM_DATA_SOURCE_ID_1'] = os.getenv('Q_CUSTOM_DATA_SOURCE_ID_1')
os.environ['Q_CUSTOM_DATA_SOURCE_ID_2'] = os.getenv('Q_CUSTOM_DATA_SOURCE_ID_2')
os.environ['DEMO_S3_BUCKET'] = os.getenv('DEMO_S3_BUCKET')
os.environ['DEMO_S3_KEY'] = os.getenv('DEMO_S3_KEY')
os.environ['CLOUDFRONT_URL'] = os.getenv('CLOUDFRONT_URL')

REGION = os.environ['REGION']
KENDRA_INDEX = os.environ['KENDRA_INDEX']
KENDRA_ROLE = os.environ['KENDRA_ROLE']
CUSTOM_DATA_SOURCE_ID_1 = os.environ['CUSTOM_DATA_SOURCE_ID_1']
CUSTOM_DATA_SOURCE_ID_2 = os.environ['CUSTOM_DATA_SOURCE_ID_2']
AMAZON_Q_APP_ID = os.environ['AMAZON_Q_APP_ID']
Q_CUSTOM_DATA_SOURCE_ID_1 = os.environ['Q_CUSTOM_DATA_SOURCE_ID_1']
Q_CUSTOM_DATA_SOURCE_ID_2 = os.environ['Q_CUSTOM_DATA_SOURCE_ID_2']
DEMO_S3_BUCKET = os.environ['DEMO_S3_BUCKET']
DEMO_S3_KEY = os.environ['DEMO_S3_KEY']
CLOUDFRONT_URL = os.environ['CLOUDFRONT_URL']

## Helper methods

In [13]:
# Create a custom data source in Amazon Kendra
import boto3
def create_custom_data_source(index_id, data_source_name, data_source_description):
    kendra = boto3.client('kendra')

    try:
        # Create a data source
        response = kendra.create_data_source(
            IndexId=index_id,
            Name=data_source_name,
            Type="CUSTOM",
            Description=data_source_description

        )
        print(f"Data source created with ID: {response['Id']}")
        return response['Id']
    except Exception as e:
        print(f"Error creating data source: {e}")
        raise      

In [14]:
# parse the dataframe to documents for kendra ingestion

import boto3
import pandas as pd

def parse_to_docs(dataSourceId, jobExecutionId, df):
    documents = []
   
    for index_label, row_series in df.iterrows():
        Text = df.at[index_label , 'Text']
        Title = df.at[index_label , 'Title']
        Url =  df.at[index_label , 'Url']
        CrawledDate = df.at[index_label , 'CrawledDate']
        docID =  df.at[index_label , 'docID']
        doc = {
            "Id": docID,
            "Blob": Text,
            "Title": Title,
            "Attributes": [
                {
                "Key": "_data_source_id",
                "Value": {
                    "StringValue": dataSourceId
                    }
                },
                {
                "Key": "_data_source_sync_job_execution_id",
                "Value": {
                    "StringValue": jobExecutionId
                    }
                },
                {
                "Key": "_source_uri",
                "Value": {
                    "StringValue": Url
                    }    
                },
                {
                "Key": "_created_at",
                "Value": {
                    "DateValue": CrawledDate
                    }    
                }
            ]
        }
        documents.append(doc)
    return documents


In [15]:
# start the data source sync job

def run_data_source_sync_job(data_source_id, index_id, df):
    kendra = boto3.client('kendra')

    #Start a data source sync job
    result = kendra.start_data_source_sync_job(
        Id = data_source_id,
        IndexId = index_id
        )

    print("Start data source sync operation: ")
    print(result)

    #Obtain the job execution ID from the result
    job_execution_id = result['ExecutionId']
    print("Job execution ID: "+job_execution_id)

    #Start ingesting documents
    try:
        #parse docs for ingestion
        docs = parse_to_docs(data_source_id, job_execution_id, df)
        #batchput docs
        result = kendra.batch_put_document(
            IndexId = index_id,
            Documents = docs
            )


    #Stop data source sync job
    finally:
        #Stop data source sync
        result = kendra.stop_data_source_sync_job(
            Id = data_source_id,
            IndexId = index_id
            )
    return result

In [16]:
# CREATE CUSTOM DATA SOURCE 1
if CUSTOM_DATA_SOURCE_ID_1 == "XXX":
    custom_data_source_id_1 = create_custom_data_source(KENDRA_INDEX, 
                                                "custom-Postgres",
                                                "Custom data source for a PostGres database"
                                                )

    print(f'custom_data_source_id_1: {custom_data_source_id_1}')
    os.environ['CUSTOM_DATA_SOURCE_ID_1'] = custom_data_source_id_1
    dotenv.set_key(local_env_filename, "CUSTOM_DATA_SOURCE_ID_1", os.environ["CUSTOM_DATA_SOURCE_ID_1"])
    CUSTOM_DATA_SOURCE_ID_1=os.environ["CUSTOM_DATA_SOURCE_ID_1"]

Data source created with ID: 7ad51f33-e31e-43f1-90aa-7d3fc83641b0
custom_data_source_id_1: 7ad51f33-e31e-43f1-90aa-7d3fc83641b0


In [17]:
# get database details

from datetime import datetime
import pandas as pd
import json

# helper methods to run SQL queries

from sqlalchemy import create_engine, MetaData, text
import boto3

def get_secret(secret_name):
    session = boto3.session.Session()
    client = session.client(service_name='secretsmanager', region_name=REGION)
    get_secret_value_response = client.get_secret_value(SecretId=secret_name)
    return get_secret_value_response

def run_sql_query(statement):
    try:
        # SQLALCHEMY_URL = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{SQL_DATABASE_NAME}"
        get_secret_value_response = get_secret("SQLALCHEMY_URL")
        SQLALCHEMY_URL = get_secret_value_response['SecretString']
        
        engine = create_engine(SQLALCHEMY_URL)
        with engine.connect() as connection:
            result = connection.execute(text(statement))
            return result.fetchall()
    except Exception as e:
        error = f"Error executing statement: {e}"
        raise


def get_database_details():

    sql_query = """
        WITH table_columns AS (
        SELECT 
            table_schema,
            table_name,
            string_agg(column_name || ' ' || data_type || 
                    CASE 
                        WHEN character_maximum_length IS NOT NULL THEN '(' || character_maximum_length || ')'
                        ELSE ''
                    END || 
                    CASE 
                        WHEN is_nullable = 'NO' THEN ' NOT NULL'
                        ELSE ''
                    END,
                    ', ' ORDER BY ordinal_position) AS table_definition
        FROM 
            information_schema.columns
        WHERE 
            table_schema NOT IN ('pg_catalog', 'information_schema')
        GROUP BY 
            table_schema, table_name
    )
    SELECT 
        tc.table_schema,
        tc.table_name,
        tc.table_definition
    FROM 
        table_columns tc
    ORDER BY 
        tc.table_schema, 
        tc.table_name;
        """

    table_details = run_sql_query(sql_query)
    table_summaries = []
    bedrock_client = boto3.client('bedrock-runtime', region_name=REGION)
    s3_client = boto3.client('s3')
    
    for table in table_details:
        table_schema, table_name, table_definition = table

        # get sample values
        sample_values = run_sql_query(f"SELECT * FROM {table_schema}.{table_name} LIMIT 5;")

        prompt = f"""
        You are a helpful assistant that summarizes a Postgres table following the instructions below.
        - The summary should be a short description of the table and what questions it might be used to answer.
        - The summary should include the table name, table columns, and sample values.
        - The summary should be in plain text.  

        Here are the details of the table:
        Table Name: {table_name}
        Table Columns: {table_definition}
        Sample Values: {sample_values}
        """

        body = json.dumps({
            "inputText": f"\n\nHuman: {prompt}\n\nAssistant:",
            "textGenerationConfig": {
                "maxTokenCount": 512,
                "temperature": 0.5,
            },
        })

        bedrock_response = bedrock_client.invoke_model(body=body, modelId="amazon.titan-text-premier-v1:0")
        response_body = json.loads(bedrock_response.get('body').read())

        Text = response_body["results"][0]["outputText"]

        docID = f"{table_schema}.{table_name}"
        Title = table_name
        CrawledDate = datetime.now().isoformat()

        # create a html file for each table and store it in s3
        html = f"<html><body><h1>{docID} - CrawledDate: {CrawledDate}</h1><pre>{Text}</pre></body></html>"

        s3_client.put_object(Bucket=DEMO_S3_BUCKET, Key=f"{DEMO_S3_KEY}{table_schema}.{table_name}.html", Body=html)

        Url = f"{CLOUDFRONT_URL}/{DEMO_S3_KEY}{table_schema}.{table_name}.html"

        table_summaries.append({
            "docID": docID,
            "Title": Title,
            "Text": Text,
            "Url": Url,
            "CrawledDate": CrawledDate,
            "CreatedBy": "huthmac@amazon.com"
        })
    # convert to dataframe
    table_summaries_df = pd.DataFrame(table_summaries)

    return table_summaries_df

table_summaries_df = get_database_details()
table_summaries_df.head()


Unnamed: 0,docID,Title,Text,Url,CrawledDate,CreatedBy
0,public.categories,categories,"The ""categories"" table has 4 columns: ""categor...",https://d3q8adh3y5sxpk.cloudfront.net/rag-demo...,2024-10-11T16:03:29.505270,huthmac@amazon.com
1,public.customer_customer_demo,customer_customer_demo,"The ""customer_customer_demo"" table has two col...",https://d3q8adh3y5sxpk.cloudfront.net/rag-demo...,2024-10-11T16:03:32.083586,huthmac@amazon.com
2,public.customer_demographics,customer_demographics,"The ""customer_demographics"" table contains inf...",https://d3q8adh3y5sxpk.cloudfront.net/rag-demo...,2024-10-11T16:03:35.020689,huthmac@amazon.com
3,public.customers,customers,"The ""customers"" table contains information abo...",https://d3q8adh3y5sxpk.cloudfront.net/rag-demo...,2024-10-11T16:03:58.010916,huthmac@amazon.com
4,public.employee_territories,employee_territories,"The ""employee_territories"" table has 2 columns...",https://d3q8adh3y5sxpk.cloudfront.net/rag-demo...,2024-10-11T16:04:02.856653,huthmac@amazon.com


In [18]:
# Synchronize custom data source 1
# iterate through the table_summaries_df in batches of 10
batch = table_summaries_df.iloc[:10]
result = run_data_source_sync_job(CUSTOM_DATA_SOURCE_ID_1, KENDRA_INDEX, batch)
print(result)

Start data source sync operation: 
{'ExecutionId': '8d9e23cd-1a08-4e2e-a017-d40c7a9a6e54', 'ResponseMetadata': {'RequestId': '3163a166-18de-46cc-9e6d-2e52fd1321fe', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '3163a166-18de-46cc-9e6d-2e52fd1321fe', 'content-type': 'application/x-amz-json-1.1', 'content-length': '54', 'date': 'Fri, 11 Oct 2024 20:04:57 GMT'}, 'RetryAttempts': 0}}
Job execution ID: 8d9e23cd-1a08-4e2e-a017-d40c7a9a6e54
{'ResponseMetadata': {'RequestId': 'c39ae552-34b5-4a97-8d67-bb7fe76e7aae', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'c39ae552-34b5-4a97-8d67-bb7fe76e7aae', 'content-type': 'application/x-amz-json-1.1', 'content-length': '0', 'date': 'Fri, 11 Oct 2024 20:04:57 GMT'}, 'RetryAttempts': 0}}


In [19]:
# CREATE CUSTOM DATA SOURCE 2
if CUSTOM_DATA_SOURCE_ID_2 == "XXX":
    custom_data_source_id_2 = create_custom_data_source(KENDRA_INDEX, 
                                                "custom-AWSGlue",
                                                "Custom data source for AWS Glue Jobs"
                                                )

    print(f'custom_data_source_id_2: {custom_data_source_id_2}')
    os.environ['CUSTOM_DATA_SOURCE_ID_2'] = custom_data_source_id_2
    dotenv.set_key(local_env_filename, "CUSTOM_DATA_SOURCE_ID_2", os.environ["CUSTOM_DATA_SOURCE_ID_2"])
    CUSTOM_DATA_SOURCE_ID_2=os.environ["CUSTOM_DATA_SOURCE_ID_2"]

Data source created with ID: db538731-c786-4711-a5d3-54dc9f57352e
custom_data_source_id_2: db538731-c786-4711-a5d3-54dc9f57352e


In [20]:
# get AWS Glue Job details

# create a function that reads all AWS Glue jobs and extracts the job name, job schedules, job run history, and job script

import boto3
import json
import os
import pandas as pd
from datetime import datetime

BUCKET_NAME = 'felixh-kendra-demo'
KEY_NAME = 'glue-etl-jobs/'

def extract_glue_job_info(glue_client, s3_client):
    jobs = glue_client.list_jobs()
    job_info = []

    for job_name in jobs['JobNames']:
        job_details = glue_client.get_job(JobName=job_name)['Job']
        job_runs = glue_client.get_job_runs(JobName=job_name)['JobRuns']

        script_location = job_details.get('Command', {}).get('ScriptLocation', '')
        print(script_location)
        script = ''
        if script_location:
            # Parse the S3 URL more robustly
            from urllib.parse import urlparse
            parsed_url = urlparse(script_location)
            bucket_name = parsed_url.netloc
            key = parsed_url.path.lstrip('/')
            
            if bucket_name and key:
                try:
                    s3_client.download_file(bucket_name, key, 'script.py')
                    with open('script.py', 'r') as file:
                        script = file.read()
                    os.remove('script.py')
                except Exception as e:
                    print(f"Error downloading script: {e}")
                    script = f"Error: Unable to download script from {script_location}"

        job_info.append({
            'JobName': job_details['Name'],
            'JobDescription': job_details.get('Description', ''),
            'JobSchedule': job_details.get('Schedule', ''),
            'JobRuns': [
                {
                    'Id': run['Id'],
                    'StartedOn': run['StartedOn'].isoformat(),
                    'CompletedOn': run['CompletedOn'].isoformat() if 'CompletedOn' in run else None,
                    'JobRunState': run['JobRunState']
                } for run in job_runs
            ],
            'ScriptLocation': script_location,
            'Script': script

        })

    return job_info

def get_glue_job_info():
    glue_client = boto3.client('glue', region_name=REGION)
    bedrock_client = boto3.client('bedrock-runtime', region_name=REGION)
    s3_client = boto3.client('s3')
    job_info = extract_glue_job_info(glue_client, s3_client)

    #iterate through job_info and create a json file for each job
    job_summaries = []
    for job in job_info:
        print(f"processing job: {job['JobName']}")

        # call amazon bedrock to create a job summary for the job

        prompt = f"""
        You are a helpful assistant that summarizes a AWS Glue ETL job following the instructions below.
        - The summary should be a short description of the job and the script.
        - The summary should include the job name, how often the job was run, and the job schedule if there is one.
        - The summary should be in plain text.  

        Here are the details of the Glue Job:
        Job Name: {job['JobName']}
        Job Description: {job['JobDescription']}
        Job Schedule: {job['JobSchedule']}
        Job Runs: {job['JobRuns']}
        Script Location: {job['ScriptLocation']}
        Script: {job['Script']}
        """

        body = json.dumps({
            "inputText": f"\n\nHuman: {prompt}\n\nAssistant:",
            "textGenerationConfig": {
                "maxTokenCount": 512,
                "temperature": 0.5,
            },
        })

        
        bedrock_response = bedrock_client.invoke_model(body=body, modelId="amazon.titan-text-premier-v1:0")
        response_body = json.loads(bedrock_response.get('body').read())

        Text = response_body["results"][0]["outputText"]
        Title = job['JobName']
        Url =  job['ScriptLocation']
        # get current date and time
        CrawledDate = datetime.now().isoformat()
        docID =  job['JobName']

        # create a html file for each table and store it in s3
        html = f"<html><body><h1>{docID} - CrawledDate: {CrawledDate}</h1><pre>{Text}</pre></body></html>"

        s3_client.put_object(Bucket=DEMO_S3_BUCKET, Key=f"{DEMO_S3_KEY}{docID}.html", Body=html)

        Url = f"{CLOUDFRONT_URL}/{DEMO_S3_KEY}{docID}.html"

        job_summaries.append({
            "docID": docID,
            "Title": Title,
            "Text": Text,
            "Url": Url,
            "CrawledDate": CrawledDate,
            "CreatedBy": "fhuthmacher@gmail.com"
        })

    # convert to dataframe
    df = pd.DataFrame(job_summaries)
    return df

job_summaries_df = get_glue_job_info()
job_summaries_df.head()
        

s3://aws-glue-assets-026459568683-us-east-1/scripts/BQ-ETL.py
s3://aws-glue-assets-026459568683-us-east-1/scripts/GlueUpdateTableColumns.py
s3://aws-glue-assets-026459568683-us-east-1/scripts/PostGresExport.py
processing job: BQ-ETL
processing job: GlueUpdateTableColumns
processing job: PostGresExport


Unnamed: 0,docID,Title,Text,Url,CrawledDate,CreatedBy
0,BQ-ETL,BQ-ETL,The AWS Glue ETL job BQ-ETL was run 5 times in...,https://d3q8adh3y5sxpk.cloudfront.net/rag-demo...,2024-10-11T16:05:05.449783,fhuthmacher@gmail.com
1,GlueUpdateTableColumns,GlueUpdateTableColumns,The Glue job GlueUpdateTableColumns was last r...,https://d3q8adh3y5sxpk.cloudfront.net/rag-demo...,2024-10-11T16:05:11.444746,fhuthmacher@gmail.com
2,PostGresExport,PostGresExport,"The AWS Glue ETL job ""PostGresExport"" was last...",https://d3q8adh3y5sxpk.cloudfront.net/rag-demo...,2024-10-11T16:05:13.554312,fhuthmacher@gmail.com


In [21]:
# Synchronize custom data source 2

result = run_data_source_sync_job(CUSTOM_DATA_SOURCE_ID_2, KENDRA_INDEX, job_summaries_df)
print(result)

Start data source sync operation: 
{'ExecutionId': 'fa2426cf-3a60-461a-8157-8052656a8e57', 'ResponseMetadata': {'RequestId': 'b534c41a-f8fa-460b-bf75-2731360e5edc', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'b534c41a-f8fa-460b-bf75-2731360e5edc', 'content-type': 'application/x-amz-json-1.1', 'content-length': '54', 'date': 'Fri, 11 Oct 2024 20:05:13 GMT'}, 'RetryAttempts': 0}}
Job execution ID: fa2426cf-3a60-461a-8157-8052656a8e57
{'ResponseMetadata': {'RequestId': '3be6b9c0-3be9-4bf5-86be-05318656312d', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '3be6b9c0-3be9-4bf5-86be-05318656312d', 'content-type': 'application/x-amz-json-1.1', 'content-length': '0', 'date': 'Fri, 11 Oct 2024 20:05:14 GMT'}, 'RetryAttempts': 0}}


# Create Q custom data sources and sync

In [48]:
# helper method to create a Q custom data source

import boto3
def create_q_custom_data_source(displayName):
    amazonq_client = boto3.client('qbusiness', region_name=REGION)
    response = amazonq_client.list_indices(
        applicationId=AMAZON_Q_APP_ID,
        
    )
    indexId = response['indices'][0]['indexId']
    configuration = {
        "type": "CUSTOM"
    }
    
    response = amazonq_client.create_data_source(
        applicationId=AMAZON_Q_APP_ID,
        indexId=indexId,
        displayName=displayName,
        configuration=configuration)
    return response

In [49]:
# helper method to parse the dataframe to documents for Q ingestion

import boto3
import pandas as pd

def parse_to_q_docs(dataSourceId, jobExecutionId, df):
    documents = []
   
    for index_label, row_series in df.iterrows():
        Text = df.at[index_label , 'Text']
        Title = df.at[index_label , 'Title']
        Url =  df.at[index_label , 'Url']
        CrawledDate = df.at[index_label , 'CrawledDate']
        docID =  df.at[index_label , 'docID']
        CreatedBy = df.at[index_label , 'CreatedBy']
        print(f"docID: {docID}")
        doc = {
            "id": docID,
            "content":{
                "blob": Text,
            },
            "contentType": "PLAIN_TEXT",
            "title": Title,
            "attributes": [
                {
                "name": "_data_source_id",
                "value": {
                    "stringValue": dataSourceId
                    }
                },
                {
                "name": "_data_source_sync_job_execution_id",
                "value": {
                    "stringValue": jobExecutionId
                    }
                },
                {
                "name": "_source_uri",
                "value": {
                    "stringValue": Url
                    }    
                },
                {
                "name": "_created_at",
                "value": {
                    "dateValue": CrawledDate
                    }    
                },
                
            ],
            # "accessConfiguration": {
            #     "accessControls": [
            #         {
            #             "principals": [
            #                 {
            #                     "user": {
            #                         "id": "arn:aws:iam::026459568683:user/huthmac",
            #                         "access": "ALLOW"
            #                     }
            #                 }
            #             ]
            #         }
            #     ]
            # }
        }
        documents.append(doc)
    return documents


In [50]:
# run the Q data source sync job

def run_q_data_source_sync_job(data_source_id, df):
    amazonq_client = boto3.client('qbusiness', region_name=REGION)
    response = amazonq_client.list_indices(
        applicationId=AMAZON_Q_APP_ID,
        
    )
    index_id = response['indices'][0]['indexId']

    #Start a data source sync job
    result = amazonq_client.start_data_source_sync_job(
        applicationId=AMAZON_Q_APP_ID,
        dataSourceId = data_source_id,
        indexId = index_id
        )

    print("Start data source sync operation: ")
    print(result)

    #Obtain the job execution ID from the result
    job_execution_id = result['executionId']
    print("Job execution ID: "+job_execution_id)

    #Start ingesting documents
    try:
        #parse docs for ingestion
        docs = parse_to_q_docs(data_source_id, job_execution_id, df)
        #batchput docs
        # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/qbusiness/client/batch_put_document.html
        result = amazonq_client.batch_put_document(
            applicationId=AMAZON_Q_APP_ID,
            indexId = index_id,
            documents = docs
            )
        print(f"Batch put document result: {result}")

        #Stop data source sync
        result = amazonq_client.stop_data_source_sync_job(
            applicationId=AMAZON_Q_APP_ID,
            dataSourceId = data_source_id,
            indexId = index_id
            )
        print(f"Stop data source sync result: {result}")

    #Stop data source sync job
    finally:
        #Stop data source sync
        result = amazonq_client.stop_data_source_sync_job(
            applicationId=AMAZON_Q_APP_ID,
            dataSourceId = data_source_id,
            indexId = index_id
            )
    return result

In [51]:
# create Q custom data source

if Q_CUSTOM_DATA_SOURCE_ID_1 == "XXX":
    displayName = 'custom-glue-jobs-data-source'
    response = create_q_custom_data_source(displayName)
    Q_CUSTOM_DATA_SOURCE_ID_1 = response['dataSourceId']
    print(f"Q_CUSTOM_DATA_SOURCE_ID_1: {Q_CUSTOM_DATA_SOURCE_ID_1}")
    os.environ['Q_CUSTOM_DATA_SOURCE_ID_1'] = Q_CUSTOM_DATA_SOURCE_ID_1
    dotenv.set_key(local_env_filename, "Q_CUSTOM_DATA_SOURCE_ID_1", os.environ["Q_CUSTOM_DATA_SOURCE_ID_1"])

Q_CUSTOM_DATA_SOURCE_ID_1: e3a4cebe-c885-466b-b201-11ded3f7913f


In [52]:
# Sync custom glue data source
result = run_q_data_source_sync_job(Q_CUSTOM_DATA_SOURCE_ID_1, job_summaries_df)
print(result)



Start data source sync operation: 
{'ResponseMetadata': {'RequestId': '1184b3d7-2205-4f23-ac7f-373fcbb7d31c', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '1184b3d7-2205-4f23-ac7f-373fcbb7d31c', 'strict-transport-security': 'max-age=47304000; includeSubDomains', 'cache-control': 'no-store, no-cache, no-cache', 'date': 'Fri, 11 Oct 2024 20:37:00 GMT', 'content-type': 'application/json', 'content-length': '54', 'connection': 'keep-alive'}, 'RetryAttempts': 0}, 'executionId': '45d0a001-79e6-43cc-8789-46a4b19f65d7'}
Job execution ID: 45d0a001-79e6-43cc-8789-46a4b19f65d7
docID: BQ-ETL
docID: GlueUpdateTableColumns
docID: PostGresExport
Batch put document result: {'ResponseMetadata': {'RequestId': 'd88e01bb-4a57-4898-86c4-bf7458a98450', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'd88e01bb-4a57-4898-86c4-bf7458a98450', 'strict-transport-security': 'max-age=47304000; includeSubDomains', 'cache-control': 'no-store, no-cache, no-cache', 'date': 'Fri, 11 Oct 2024 20:

In [53]:
if Q_CUSTOM_DATA_SOURCE_ID_2 == "XXX":
    displayName = 'custom-postgres-data-source'
    response = create_q_custom_data_source(displayName)
    Q_CUSTOM_DATA_SOURCE_ID_2 = response['dataSourceId']
    print(f"Q_CUSTOM_DATA_SOURCE_ID_2: {Q_CUSTOM_DATA_SOURCE_ID_2}")
    os.environ['Q_CUSTOM_DATA_SOURCE_ID_2'] = Q_CUSTOM_DATA_SOURCE_ID_2
    dotenv.set_key(local_env_filename, "Q_CUSTOM_DATA_SOURCE_ID_2", os.environ["Q_CUSTOM_DATA_SOURCE_ID_2"])

Q_CUSTOM_DATA_SOURCE_ID_2: 7e6bd98b-081a-4de9-a9af-ad296d9374cc


In [54]:
# Sync custom postgres data source
result = run_q_data_source_sync_job(Q_CUSTOM_DATA_SOURCE_ID_2, table_summaries_df[:10])
print(result)

Start data source sync operation: 
{'ResponseMetadata': {'RequestId': 'f746910d-e9f4-47e4-8855-0b0b6a6fa376', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'f746910d-e9f4-47e4-8855-0b0b6a6fa376', 'strict-transport-security': 'max-age=47304000; includeSubDomains', 'cache-control': 'no-store, no-cache, no-cache', 'date': 'Fri, 11 Oct 2024 20:37:54 GMT', 'content-type': 'application/json', 'content-length': '54', 'connection': 'keep-alive'}, 'RetryAttempts': 0}, 'executionId': '0f0e1afa-675d-467d-86b9-cf1e8fe2cb7a'}
Job execution ID: 0f0e1afa-675d-467d-86b9-cf1e8fe2cb7a
docID: public.categories
docID: public.customer_customer_demo
docID: public.customer_demographics
docID: public.customers
docID: public.employee_territories
docID: public.employees
docID: public.order_details
docID: public.orders
docID: public.productreviews
docID: public.products
Batch put document result: {'ResponseMetadata': {'RequestId': '464cb51b-448e-4df9-b794-954dc787be33', 'HTTPStatusCode': 200, 'HTTPH

## Appendix

In [30]:
# # create some sample data for standard RDS Postgres database Connector

# def create_sql(statement):
#     try:
#             # SQLALCHEMY_URL = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{SQL_DATABASE_NAME}"
#         get_secret_value_response = get_secret("SQLALCHEMY_URL")
#         SQLALCHEMY_URL = get_secret_value_response['SecretString']
        
#         engine = create_engine(SQLALCHEMY_URL)
#         with engine.connect() as connection:
#             connection.execute(text(statement))
#             connection.commit()
#     except Exception as e:
#         error = f"Error executing statement: {e}"
#         raise

# create_sql("""
# CREATE TABLE public.ProductReviews (
#     ProductID INT,
#     ProductName VARCHAR(100),
#     Review TEXT,
#     Rating INT,
#     Reviewer VARCHAR(100),
#     CreatedBy VARCHAR(100)
# );
# """)

# create_sql("""
# INSERT INTO public.ProductReviews (ProductID, ProductName, Review, Rating, Reviewer, CreatedBy)
# VALUES 
# (1, 'Wireless Earbuds', 'Great sound quality and comfortable fit!', 5, 'John Smith', 'huthmac@amazon.com'),
# (2, 'Smart Watch', 'Decent features but battery life could be better.', 4, 'Emma Johnson', 'huthmac@amazon.com'),
# (3, 'Laptop', 'Excellent performance for the price.', 5, 'Michael Brown', 'huthmac@amazon.com'),
# (4, 'Coffee Maker', 'Makes great coffee but a bit noisy.', 4, 'Sarah Davis', 'huthmac@amazon.com'),
# (5, 'Fitness Tracker', 'Accurate step counting, but the app needs improvement.', 3, 'David Wilson', 'huthmac@amazon.com'),
# (6, 'Bluetooth Speaker', 'Impressive sound for its size!', 5, 'Lisa Anderson', 'huthmac@amazon.com'),
# (7, 'Electric Toothbrush', 'My teeth feel cleaner, but it''s a bit pricey.', 4, 'Robert Taylor', 'huthmac@amazon.com'),
# (8, 'Air Fryer', 'Cooks food quickly and evenly. Easy to clean too!', 5, 'Jennifer Martinez', 'huthmac@amazon.com'),
# (9, 'Gaming Mouse', 'Responsive and comfortable for long gaming sessions.', 5, 'Chris Lee', 'huthmac@amazon.com'),
# (10, 'Portable Charger', 'Charges quickly but doesn''t hold as much power as advertised.', 3, 'Emily White', 'huthmac@amazon.com');
# """)

