In [1]:
# !pip install -q xlrd langchain_community langchain_core requests-aws4auth opensearch-py

In [2]:
import os 
import boto3
import re
import pandas as pd
import datetime
from io import BytesIO
from langchain_core.documents import Document
from langchain_aws import BedrockEmbeddings
from opensearchpy import OpenSearch, RequestsHttpConnection
import json 
from requests_aws4auth import AWS4Auth

In [3]:
FILE_CONFIGS = ['s3://canada-gen-ai/health_index_augdata.csv']
INDEX_CONFIGS ={'Hackathon_index': 'innovator_hack_index'}

In [4]:
# read csv file from S3 
def read_csv(file_path):
    s3 = boto3.client('s3')
    bucket, key = file_path.replace("s3://", "").split("/", 1)
    response = s3.get_object(Bucket=bucket, Key=key)
    file_content = response['Body'].read()
    df = pd.read_csv(BytesIO(file_content))

    return df, file_path


# create OpenSearch vector search index
def create_vector_search_index(opensearch_client, index_name, dimension):
    index_body = {
        "settings": {
            "index": {
                "number_of_shards": 1,
                "number_of_replicas": 1,
                "knn": True  # Enable k-NN search
            }
        },
        "mappings": {
            "properties": {
                "snapshot_date": {"type": "date", "format": "yyy-MM-dd HH:mm:ss"},
                "source": {"type": "keyword"},  # Source name
                "content": {"type": "text"},  # Store row content as text
                "embedding": {"type": "knn_vector", "dimension": dimension}  # Vector embedding for k-NN search
            }
        }
    }
    try:
        response = opensearch_client.indices.create(index=index_name, body=index_body)
        print(f"Index {index_name} created successfully.")
    except Exception as e:
        if 'resource_already_exists_exception' in str(e):
            print(f"Index {index_name} already exists. Skipping creation.")
        else:
            raise


# add documents to OpenSearch index
def add_documents_to_opensearch(opensearch_client, index_name, df, bedrock_embeddings):
    document_counter = 0
    for i, row in df.iterrows():
        try:
            row_content_serialized = {
                key: (value if pd.notna(value) else None)
                for key, value in row.items() if key not in ['file_path']
            }
            
            text = " ".join([str(value) for value in row.values if pd.notna(value)])
            
            embedding = bedrock_embeddings.embed_documents([text])[0]
            
            row_content_text = json.dumps(row_content_serialized)
            
            source_name = "health_index_augdata.csv"

            
            document = {
                'snapshot_date': datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'),
                'source': source_name,
                'content': row_content_text,
                'embedding': embedding
            }
            
            response = opensearch_client.index(index=index_name, body=document)
            print(f"Document {i} indexed successfully.")
            document_counter += 1
            
        except Exception as e:
            print(f"Failed to index document {i} due to inconsistent value: {e}")
            continue


# Main processing function
def process_csv_files(file_configs, opensearch_client, index_name, bedrock_embeddings, dimension):
    for file_path in file_configs:
        print(f"Processing file: {file_path}")
        df, _ = read_csv(file_path)
        create_vector_search_index(opensearch_client, index_name, dimension)
        add_documents_to_opensearch(opensearch_client, index_name, df, bedrock_embeddings)


In [5]:
# Initialize OpenSearch client
def init_opensearch_client(host, port, region, service):
    credentials = boto3.Session().get_credentials()
    awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
    return OpenSearch(
        hosts=[{'host': host, 'port': port}],
        http_auth=awsauth,
        use_ssl=True,
        verify_certs=True,
        connection_class=RequestsHttpConnection,
        timeout=3000
    )


opensearch_host = 'iellhhrn6kean028im78.us-east-1.aoss.amazonaws.com'
opensearch_port = 443
opensearch_region = 'us-east-1'
opensearch_service = 'aoss'
index_name = INDEX_CONFIGS['Hackathon_index']
dimension = 1024

opensearch_client = init_opensearch_client(opensearch_host, opensearch_port, opensearch_region, opensearch_service)
embedding_model = BedrockEmbeddings(client=boto3.client("bedrock-runtime", region_name=opensearch_region), model_id="amazon.titan-embed-text-v2:0")

In [6]:
process_csv_files(FILE_CONFIGS, opensearch_client, index_name, embedding_model, dimension)

Processing file: s3://canada-gen-ai/health_index_augdata.csv
Index innovator_hack_index created successfully.
Document 0 indexed successfully.
Document 1 indexed successfully.
Document 2 indexed successfully.
Document 3 indexed successfully.
Document 4 indexed successfully.
Document 5 indexed successfully.
Document 6 indexed successfully.
Document 7 indexed successfully.
Document 8 indexed successfully.
Document 9 indexed successfully.
Document 10 indexed successfully.
Document 11 indexed successfully.
Document 12 indexed successfully.
Document 13 indexed successfully.
Document 14 indexed successfully.
Document 15 indexed successfully.
Document 16 indexed successfully.
Document 17 indexed successfully.
Document 18 indexed successfully.
Document 19 indexed successfully.
Document 20 indexed successfully.
Document 21 indexed successfully.
Document 22 indexed successfully.
Document 23 indexed successfully.
Document 24 indexed successfully.
Document 25 indexed successfully.
Document 26 inde