# Investment Analyst Assistant Indexing Notebook

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.

Copyright 2024 Amazon Web Services, Inc.

### Introduction
This notebook allows you to upload and preprocess documents, create chunks, generate embeddings, and upload to OpenSearch Serverless.

### STEP 0:  Install Packages

NOTE: Warnings and in some case, version errors can be ignored for package installation. Those are due to version updates. Only change versions if necessary.

In [1]:
# Install missing packages or modules
!pip install boto3 --quiet
!pip install opensearch-py==2.4.2 --quiet
!pip install retry --quiet
!pip install amazon-textract-textractor==1.3.5 --quiet

#### Adding Project Directory to Path

In [1]:
import sys
import os
project_root = os.path.abspath(os.path.join(os.getcwd(), '..'))  # Adjust this path as needed
sys.path.append(project_root)

### STEP 1:  Import Modules

In [2]:
import json
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
from opensearchpy.helpers import bulk
import boto3
import time
import os
from io import StringIO
import csv
from retry import retry
import urllib3
import logging
import sagemaker
import time
import re

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [3]:
session = boto3.Session()
sts_client = session.client('sts')
account_id = sts_client.get_caller_identity()['Account']
print(f"Your AWS account number is: {account_id}")

Your AWS account number is: 123123123123


#### Notebook Configuration

In [4]:
## Amzon S3 bucket names need to be unique and should be per Amazon S3 naming rules.
## Please refet to this for naming rules  https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
## Bucket names are checked in the next step and the naming conventions might cause errors.
exp_config = {
    'region': 'us-west-2',
    'source_bucket': f'source-files-{account_id}', #name of the S3 Bucket to upload the PDF documents
    'service_bucket': f'service-bucket-{account_id}', #Different bucket than source to store metadata file 
    'embb_model': 'amazon.titan-embed-text-v1'
            }

#### Create Amazon s3 bucked and upload the sample docs

In [5]:
# Create S3 client
session = boto3.Session(region_name=exp_config['region'])
s3_client = session.client('s3')

# Bucket name
source_bucket_name = exp_config['source_bucket']
service_buck_name = exp_config['service_bucket']
# Check if source bucket exists
try:
    s3_client.head_bucket(Bucket=source_bucket_name)
except s3_client.exceptions.ClientError as e:
    # Get exception code
    error_code = e.response['Error']['Code']
    if error_code == '404':
        print(f"Bucket {source_bucket_name} does not exist, creating source bucket...")
        # Create bucket
        s3_client.create_bucket(Bucket=source_bucket_name, CreateBucketConfiguration={'LocationConstraint': exp_config['region']})
    else:
        print(f"Error: {e}")
else:
    print(f"Bucket {source_bucket_name} already exists")

# Check if service bucket exists
try:
    s3_client.head_bucket(Bucket=service_buck_name)
except s3_client.exceptions.ClientError as e:
    # Get exception code
    error_code = e.response['Error']['Code']
    if error_code == '404':
        print(f"Bucket {service_buck_name} does not exist, creating source bucket...")
        # Create bucket
        s3_client.create_bucket(Bucket=service_buck_name, CreateBucketConfiguration={'LocationConstraint': exp_config['region']})
    else:
        print(f"Error: {e}")
else:
    print(f"Bucket {service_buck_name} already exists")

# Upload files from the folder to the root of the bucket
print(f"Uploading sample files to {service_buck_name} bucket")
folder_path = '../data/sample_docs/'
for root, dirs, files in os.walk(folder_path):
    for file in files:
        file_path = os.path.join(root, file)
        object_key = os.path.relpath(file_path, folder_path)
        s3_client.upload_file(file_path, source_bucket_name, object_key)

print("Uploading Complete!")

Bucket source-files-123123123123 already exists
Bucket service-bucket-123123123123 already exists
Uploading sample files to service-bucket-123123123123 bucket
Uploading Complete!


### STEP 2:  Create Openserach collection/host if it doesn't exist

##### This step will Create Opensearch collection for Indexing the document. It will Only create the collection if it doesn't exist. If Exists, It will simply output the host whaich can be copied to configs.py

#### Get Sagemaker Role ARN

In [6]:
sts_client = boto3.client('sts')
notebook_role_arn = sagemaker.get_execution_role()
print (notebook_role_arn)

arn:aws:iam::123123123123:role/service-role/AmazonSageMaker-ExecutionRole-20240621T171347


#### (OPTIONAL) These permissions are Already provided. In Case of some access issues, update the role as follows
Goto IAM>Roles
* Look for the IAM Role(output from previous cell execution) that the studio is assuming. 
* Create an inline policy with the policy definition below and attach it to the IAM role.
* Also attach "arn:aws:iam::aws:policy/AmazonTextractFullAccess" to the iam role. Add the other policies mentioned below only if necessary. most of it should already be configured. 

##### Make sure the role has following permissions
```
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:DeleteObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "aoss:*"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "bedrock:InvokeModel",
                "bedrock:InvokeModelWithResponseStream"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
```

##### Additional Managed policy required
##### arn:aws:iam::aws:policy/AmazonOpenSearchServiceFullAccess
##### arn:aws:iam::aws:policy/AmazonSageMakerFullAccess
##### arn:aws:iam::aws:policy/AmazonTextractFullAccess

##### <strong>Please note that these roles are permissible and are meant only for testing and shall NOT be used for production.</strong>

#### Initiate OpenSearch Serverless Instance
                       

In [7]:
credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAuth(credentials, exp_config['region'], "aoss")
opensearch_client = boto3.client('opensearchserverless')


##### Input name variables

In [8]:
collection_name = 'my-expt-collection' # Name for Opensearch Serverless Collection, change as necessary
index_name = "expt_index" # Name for Index, change as necessary
data_access_policy_name = "data-access-expt-policy" # Name for data access policy, change as necessary
encryption_policy_name = "expt-encryption-policy" # Name for encryption policy, change as necessary
network_policy_name = "expt-network-policy" # Name for network policy, change as necessary

##### Opensearch Access Policy Definitions

In [9]:
# Data Access Policy
data_access_policy = json.dumps(
[
  {
    "Rules": [
      {
        "Resource": [
          "collection/*"
        ],
        "Permission": [
          "aoss:CreateCollectionItems",
          "aoss:DeleteCollectionItems",
          "aoss:UpdateCollectionItems",
          "aoss:DescribeCollectionItems"
        ],
        "ResourceType": "collection"
      },
      {
        "Resource": [
          "index/*/*"
        ],
        "Permission": [
          "aoss:CreateIndex",
          "aoss:DeleteIndex",
          "aoss:UpdateIndex",
          "aoss:DescribeIndex",
          "aoss:ReadDocument",
          "aoss:WriteDocument"
        ],
        "ResourceType": "index"
      }
    ],
    "Principal": [
        notebook_role_arn
    ],
    "Description": "data-access-rule"
  }
]
)
# Convert policy to JSON string
data_access_policy_json = json.dumps(data_access_policy)

#Encryption Policy

encryption_policy = {
    "Rules": [
        {
            "Resource": [
                f"collection/{collection_name}"
            ],
            "ResourceType": "collection"
        }
    ],
    "AWSOwnedKey": True
}

#Network Policy
network_policy = json.dumps(
[
  {
    "Rules": [
      {
        "Resource": [
          f"collection/{collection_name}"
        ],
        "ResourceType": "dashboard"
      },
      {
        "Resource": [
          f"collection/{collection_name}"
        ],
        "ResourceType": "collection"
      }
    ],
    "AllowFromPublic": True
  }
]
)


#### Create Open Search Serverless Collection

##### NOTE: This step might take couple minutes to complete. This is because OpenSearch Serverless is being provisioned.

In [10]:
collection_endpoint = None
try:
    # Check if the collection already exists
    response = opensearch_client.list_collections()
    existing_collections = [collection['name'] for collection in response['collectionSummaries']]
    
    if collection_name in existing_collections:
        print(f"Collection '{collection_name}' already exists.")
    else:
        response = opensearch_client.create_security_policy(
            name=network_policy_name,
            policy=network_policy,
            type="network")
        response = opensearch_client.create_access_policy(
            name= data_access_policy_name,
            type='data',
            policy=data_access_policy)
        response = opensearch_client.create_security_policy(
            name= encryption_policy_name,
            policy=json.dumps(encryption_policy),
            type='encryption')

        # Create the collection
        response = opensearch_client.create_collection(
            name=collection_name,
            type='VECTORSEARCH')
        print(f"Collection '{collection_name}' created successfully.")
except Exception as e:
    print(f"An error occurred: {e}")

response = opensearch_client.batch_get_collection(
    names=[collection_name])
while collection_endpoint == None:
    try:
        response = opensearch_client.batch_get_collection(
            names=[collection_name])
        collection_detail = response['collectionDetails'][0]
        collection_endpoint = collection_detail['collectionEndpoint'].replace("https://", "")
        break
    except:
        time.sleep(30)
print(f"OpenSearch endpoint for '{collection_name}' collection: {collection_endpoint}")


Collection 'my-expt-collection' already exists.
OpenSearch endpoint for 'my-expt-collection' collection: collectionendpoint.us-west-2.aoss.amazonaws.com


##### Updating Config File

In [11]:
with open('../libraries/iaa/configs.py', 'r') as config_file:
    lines = config_file.readlines()
with open('../libraries/iaa/configs.py', 'w') as config_file:
    for line in lines:
        if line.startswith("OPEN_SEARCH_HOST"):
            config_file.write(f"OPEN_SEARCH_HOST = '{collection_endpoint}'\n")
        else:
            config_file.write(line)

#### Importing Project relevant Modules

In [17]:
from libraries.iaa.textSplitter import TextSplitter
from libraries.iaa.textractPdfParser import TextractPdfParser
from libraries.iaa.text_summary import text_summary
from libraries.iaa.sectionsplitter import sectionSplitter
from libraries.iaa.embedUpload import bulkLoadChunks

### STEP 3:  Create index in collection

##### Note: Only need to run for a index (index name) once. Run this if you want to create new index within same collection.

In [14]:
# Define the index mapping
index_mapping = {
    "settings": {
        "index": {
            "knn": True
        }
    },
    "mappings": {
        "properties": {
            "vector_field": {
                "type": "knn_vector",
                "dimension": 1536
            },
            "text": {
                "type": "text"
            },
            "doc_name": {
                "type": "text"
            },
            "doc_link": {
                "type": "text"
            }
        }
    }
}

client = OpenSearch(
        hosts=[{'host': collection_endpoint, 'port': 443}],
        http_auth=auth,
        use_ssl=True,
        verify_certs=True,
        connection_class=RequestsHttpConnection,
        timeout=300
    )

# Create the index with the specified mapping
if not client.indices.exists(index=index_name):
        # Create the index if it doesn't exist
        client.indices.create(index=index_name, body=index_mapping)
        print(f"Index '{index_name}' created successfully.")
else:
    print(f"Index '{index_name}' already exists.")

Index 'expt_index' already exists.


### STEP 4:  Getting List of files from S3 Bucket

In [15]:
s3 = boto3.client('s3')

# Get a list of object keys (file names) in the bucket
objects = s3.list_objects_v2(Bucket=exp_config['source_bucket'])

### STEP 5:  Chunking documents and uploading to Opensearch Serverless

##### This step parses the documents, chunks it and then indexes it. It will take around 30 minutes for all the sample documents. However, you may proceed to notebook : "02_Query-rewrite_Retrieval_and_Generation_Notebook_OpenAI" after ypu see the first "Uploading documents in OpenSearch Completed" in the output. This means that the first document has been processed. The processing will happen in the background as we work through the other notebooks.
##### <strong> DO NOT CLOSE THE NOTEBOOK UNTIL THE PROCESS IS COMPLETE FOR ALL THE FILES </strong>

In [18]:
# Print the file names
for obj in objects.get('Contents', []):
    file= str(obj['Key'])
    sec = sectionSplitter(sourcebucket = exp_config['source_bucket'], doc_path = file, indexId = index_name, embeddingsModel = exp_config['embb_model'], serviceBucket = exp_config['service_bucket'])
    chunks = sec.doc2index()
    bulkLoadChunks(chunks, indexId= index_name, embeddingsModel = exp_config['embb_model'])
print("Indexing Completed!")

Job still in progress...
Job SUCCEEDED...
Retrieving content
Embedding vector for section chunks started
Embedding vector for section chunks completed
Adding chunks to index
Adding chunks to index complete
Uploading documents in OpenSearch Completed
Job still in progress...
Job SUCCEEDED...
Retrieving content
Embedding vector for section chunks started
Embedding vector for section chunks completed
Adding chunks to index
Adding chunks to index complete
Uploading documents in OpenSearch Completed
Job still in progress...
Job SUCCEEDED...
Retrieving content
Embedding vector for section chunks started
Embedding vector for section chunks completed
Adding chunks to index
Adding chunks to index complete
Uploading documents in OpenSearch Completed
Job still in progress...
Job SUCCEEDED...
Retrieving content
Embedding vector for section chunks started
Embedding vector for section chunks completed
Adding chunks to index
Adding chunks to index complete
Uploading documents in OpenSearch Completed
