# 01. Loading Data into Vector Database

In this notebook we will prepare and load some data into the vector database

**!!!IMPORTANT!!!**
One cell in this notebook (by default commented) drops table on the database. Use with care. Avoid running all this notebook with "Run all"

### 1. Set up

**!!!MANUAL STEP!!!**

You need to request certain model access in Amazon Bedrock. Follow the steps in https://docs.aws.amazon.com/bedrock/latest/userguide/model-access.html#add-model-access.

Make sure to set the AWS region right before requesting model access. By default, you need these models: 
1. Titan Embeddings G1 - Text
2. Claude

In [None]:
!pip install psycopg2-binary -q

In [None]:
import boto3
import json, time, os, uuid, shutil
import psycopg2
from helper.bastion import find_instances

bedrock = boto3.client("bedrock-runtime")

### 2. Create the data

The following dataset is a mock dataset used for illustration on how to use this solution and for demo purpose only. 

Ideally you should use your own dataset and copy/upload it to the `data` folder.

In [None]:
!pygmentize ./data/data.txt

### 3. Configure Vector DB

Load the output variables from the deployment

In [None]:
deployment_output = json.load(open("./deployment-output.json","r"))
rds_host = deployment_output["RecommenderStack"]["dbwriterendpoint"]
bastion_asg = deployment_output["RecommenderStack"]["bastionhostasgname"]
bastion_id = find_instances(bastion_asg) if bastion_asg != "" else None
connect_to_db_via_bastion = False # Set to True if you are running this Notebook without VPC connection to the DB.

Configure the Database class to interact with the database

In [None]:
class Database():
    def __init__(self, writer, bastion_id=None, embedding_dimension=1536, port=5432, database_name="vectordb"):
        self.writer_endpoint = writer
        self.username = None
        self.password = None
        self.port = port
        self.database_name = database_name
        self.embedding_dimension = embedding_dimension
        self.bastion_id = bastion_id # Also indicates that DB commands are run via a bastion host with AWS SSM.
        self.conn = None
    
    def fetch_credentials(self):
        secrets_manager = boto3.client("secretsmanager")
        credentials = json.loads(secrets_manager.get_secret_value(
            SecretId='AuroraClusterCredentials'
        )["SecretString"])
        self.username = credentials["username"]
        self.password = credentials["password"]
    
    def connect_for_writing(self):
        if self.username is None or self.password is None: self.fetch_credentials()
        
        conn = psycopg2.connect(host=self.writer_endpoint, port=self.port, user=self.username, password=self.password, database=self.database_name)
        conn.autocommit = True
        self.conn = conn
        
        return conn
    
    def close_connection(self):
        if self.conn is not None:
            self.conn.close()
            self.conn = None
    
    def create_pgvector_extension(self):
        return self.query_database("CREATE EXTENSION IF NOT EXISTS vector;")
    
    # This might error out if the table already exists.
    def create_vector_table(self):
        response = self.query_database(f"CREATE TABLE items (id bigserial PRIMARY KEY, description text, embedding vector({str(self.embedding_dimension)}));")
        return response
        
    def insert_vector(self, query_template, text, embedding, additional_query_parameters = []):
        text = psycopg2.extensions.adapt(text)
        
        all_query_parameters = [text, str(embedding)] + additional_query_parameters
        query_statement = query_template.format(*all_query_parameters)
        
        return self.query_database(query_statement)
    
    def add_hnsw_index(self):
        return self.query_database("CREATE INDEX ON items USING hnsw (embedding vector_cosine_ops);")
    
    def query_database(self, query, tuples_only_and_unaligned=False):
        if self.username is None or self.password is None: self.fetch_credentials()
        
        ssm = boto3.client("ssm")
        
        #print(query)
        
        if self.bastion_id is None or not connect_to_db_via_bastion:
            if self.conn is None: self.connect_for_writing()
            
            cur = self.conn.cursor()
            cur.execute(query)

            try:
                result = cur.fetchall()
            except Exception as e:
                if str(e) != "no results to fetch": print(e)
                result = cur.statusmessage

            cur.close()
            return result
            
        else:
            query_id = str(uuid.uuid4())[:8]
            query_modifier = " -At" if tuples_only_and_unaligned  else "" 
            query_command = f"""export PGPASSWORD='{self.password}' && echo "{query}" > ./q{query_id}.txt && psql -h {self.writer_endpoint} -p 5432 -U {self.username} -d {self.database_name} -f ./q{query_id}.txt {query_modifier} && rm ./q{query_id}.txt"""

            response = ssm.send_command(
                        InstanceIds=[self.bastion_id],
                        DocumentName="AWS-RunShellScript",
                        Parameters={'commands': [query_command]})

            command_id = response['Command']['CommandId']
            flight_flag = True
            while flight_flag:
                try:
                    output = ssm.get_command_invocation(
                      CommandId=command_id,
                      InstanceId=self.bastion_id
                    )
                    flight_flag = False
  
                    output_string = ""
                    if output["StandardOutputContent"] != '': output_string = output["StandardOutputContent"]
                    if output["StandardOutputUrl"] !=  '': output_string = output["StandardOutputUrl"]
                    if output["StandardErrorContent"] !=  '': output_string = output["StandardErrorContent"]
                    if output["StandardErrorUrl"] !=  '': output_string = output["StandardErrorUrl"]

                    if output["StandardErrorContent"] !=  '' or output["StandardErrorUrl"] !=  '':
                        print(output_string)

                    return output_string
                except:
                    time.sleep(1)
            return output_string

db = Database(writer=rds_host, bastion_id=bastion_id)

Test the connection. If this fails with error "relation "items" does not exist", then the connection is doing fine but the table is not yet created. If this is the case, uncomment the next cell and run it.

If the cell below just keeps on running for long time without giving output, likely there is an issue with the network.

In [None]:
print(db.query_database("SELECT Count(*) FROM items;"))

Set up the vector database

In [None]:
# The infrastructure deployment should already installed the pgVector extension and created the table for you. 
# Only uncomment run below code if the pgVector extension was not installed as it was supposed to be or if the table was not created respectively.

#db.create_pgvector_extension();
#db.create_vector_table();

Define query statement template to insert data. This is the part where you can customize.

Since this solution is customizable, it allows you to customer the vector insert query. It will then be uploaded to S3 (in notebook 03) and be used by the Lambda function in the actual inference.

Note that the AWS Lambda that backs the API is set to run `.format(*parameters)` from this template, while `parameters` will be a merged array of `[text, embedding]` and any additional parameters you supply during inference time. For example, if you want to add more columns to be used with the WHERE clause during search/query time, you can do so by adding the placeholder values for those column data as {2}, {3}, and so on in this template. You must remember to supply these parameters via `additional_query_parameters` when invoking the data loading API. By default the `additional_query_parameters` is and empty list `[]`.

As a restriction, {0} has to be the text description of the item and {1} has to be the embedding to inserted.

In [None]:
query_template = "INSERT INTO items (description,embedding) VALUES ( {0} ,'{1}');"

# Store it on disk
path = "vector_insert_query.txt" # Do not change the naming of the file
f = open(path, "w")
f.write(query_template)
f.close()

In [None]:
# Assign the value for additional query parameters (if any) now for testing
additional_query_parameters = []

Load data

In [None]:
# The file path of your dataset in the local disk. Change as necessary.
data_file_path = "data/data.txt"
# The delimiter of data points in your dataset file. Change as necessary.
data_delimiter="###"

In [None]:
data_string = open(data_file_path, "r").read()
data = data_string.split(data_delimiter) if data_delimiter in data_string else [data_string]
for text in data:

    body = json.dumps(
        {
            "inputText": text,
        }
    )

    response = bedrock.invoke_model(body=body, modelId="amazon.titan-embed-text-v1")
    embedding = json.loads(response.get("body").read())["embedding"]
    res = db.insert_vector(query_template, text, embedding, additional_query_parameters=additional_query_parameters)
    
#db.add_hnsw_index(); # This may on work on Aurora PostgreSQL Engine with version > 15.4. Currently the infrastructure is deployed with version 15.3

Make sure data insertion worked

In [None]:
print(db.query_database("SELECT Count(*) FROM items;"))