# Data ingestion

***This notebook works best with the `conda_python3` on the `ml.t3.xlarge` instance***.

---

In this notebook we download the images and text files corresponding to the `pdf file/slide deck` that we uploaded into Amazon S3 in the [1_data_prep.ipynb](./1_data_prep) notebook, get text description from `images` and `text files`, convert them into embeddings and then ingest these embeddings into a vector database i.e. [Amazon OpenSearch Service Serverless](https://aws.amazon.com/opensearch-service/features/serverless/).

1. We use the [Anthropic’s Claude 3 Sonnet foundation model](https://aws.amazon.com/about-aws/whats-new/2024/03/anthropics-claude-3-sonnet-model-amazon-bedrock/) available on Bedrock to convert image to text.

1. We use the text extracted from each pdf page as is and convert them into embeddings using [Amazon Titan Text Embeddings](https://docs.aws.amazon.com/bedrock/latest/userguide/titan-embedding-models.html) and stored in a `text` index. Each image file is first described using `Claude Sonnet` then the embeddings of the text description of that image is stored in an `image index`.

1. We use an `entities` field in the `index body metadata` to store entities from both images and texts in their respective `image and text indexes`. The entities from images are extracted using `Claude Sonnet` and entities from texts extracted files using `nltk`. The purpose of extracting these entities is to later use them as a `prefilter` to get only the related documents to any user question.

1. We use `Ray` for running Bedrock inference concurrently in an asynchronous manner.

1. The embeddings are then ingested into OpenSearch Service Serverless using the [Amazon OpenSearch Ingestion](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ingestion.html) pipeline. We ingest the embeddings into an OpenSearch Serverless index via the OpenSearch Ingestion API.

1. The OpenSearch Service Serverless Collection is created via the AWS CloudFormation stack for this blog post.


## Step 1. Setup

Install the required Python packages and import the relevant files.

In [None]:
# install the requirements before running this notebook
import sys
!{sys.executable} -m pip install -r requirements.txt

In [None]:
# import the libraries that are needed to run this notebook
import os
import re
import ray
import time
import glob
import json
import yaml
import time
import nltk
import boto3
import base64
import logging
import requests
import botocore
import sagemaker
import numpy as np
import opensearchpy
import globals as g
from pathlib import Path
from nltk.tree import Tree
from nltk.tag import pos_tag
from typing import List, Dict
from nltk.chunk import ne_chunk
from nltk.tokenize import word_tokenize
from nltk import pos_tag, word_tokenize, punkt
from requests_auth_aws_sigv4 import AWSSigV4
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
from utils import get_cfn_outputs, get_bucket_name, download_image_files_from_s3, get_text_embedding, load_and_merge_configs

In [None]:
# set a logger
logging.basicConfig(format='[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)

In [None]:
if ray.is_initialized():
    ray.shutdown()
ray.init()

In [None]:
CONFIG_FILE_PATH = "config.yaml"

In [None]:
# load the merged config file - user config file, and parent config file
config = load_and_merge_configs(g.CONFIG_SUBSET_FILE, g.FULL_CONFIG_FILE)
logger.info(f"config file -> {json.dumps(config, indent=2)}")

In [None]:
region: str = boto3.Session().region_name
claude_model_id: str = config['model_info']['inference_model_info'].get('model_id')
endpoint_url: str = g.BEDROCK_EP_URL.format(region=region)
bedrock = boto3.client(service_name="bedrock-runtime", region_name=region, endpoint_url=endpoint_url)

In [None]:
bucket_name: str = get_bucket_name(config['aws']['cfn_stack_name'])
logger.info(f"Bucket name being used to store extracted images and texts from data: {bucket_name}")
s3 = boto3.client('s3')

In [None]:
sagemaker_session = sagemaker.Session()
sm_client = sagemaker_session.sagemaker_client
sm_runtime_client = sagemaker_session.sagemaker_runtime_client

In [None]:
outputs = get_cfn_outputs(config['aws']['cfn_stack_name'])
host = outputs['MultimodalCollectionEndpoint'].split('//')[1]
text_index_name = outputs['OpenSearchTextIndexName']
img_index_name = outputs['OpenSearchImgIndexName']
logger.info(f"opensearchhost={host}, text index={text_index_name}, image index={img_index_name}")
osi_text_endpoint = f"https://{outputs['OpenSearchPipelineTextEndpoint']}/data/ingest"
osi_img_endpoint = f"https://{outputs['OpenSearchPipelineImgEndpoint']}/data/ingest"

#### We use the OpenSearch client to create an index.
---
For the purpose of segregation and ease of understanding, we are initializing two opensearch clients (for each image and text index). You can create/use just one index too.

In [None]:
session = boto3.Session()
credentials = session.get_credentials()
auth = AWSV4SignerAuth(credentials, region, g.OS_SERVICE)

# Represents the OSI client for images
img_os_client = OpenSearch(
    hosts = [{'host': host, 'port': 443}],
    http_auth = auth,
    use_ssl = True,
    verify_certs = True,
    connection_class = RequestsHttpConnection,
    pool_maxsize = 20
)

# Represents the OSI client for texts
text_os_client = OpenSearch(
    hosts = [{'host': host, 'port': 443}],
    http_auth = auth,
    use_ssl = True,
    verify_certs = True,
    connection_class = RequestsHttpConnection,
    pool_maxsize = 20
)

#### Index Body
---
Given below is the index body that is stored in the opensearch service. It contains information about:

1. **File path**: The path of the text or image file in the index

1. **File text**: The texts extracted from the pdf files (for the text index) or the image descriptions for images that are stored in the image index

1. **Page number**: Represents the page number that the content is stemming from

1. **Metadata**: This field within the index body contains information about the name of the file and entities. Entities represent names of organizations, people, and other important within the pdf text/image that is extracted and stored as metadata for future prefilter purposes to only get relevant documents during the process of search for relevant documents.

In [None]:
index_body = """
{
  "settings": {
    "index.knn": true
  },
  "mappings": {
    "properties": {
      "vector_embedding": {
        "type": "knn_vector",
        "dimension": 1536,
        "method": {
          "name": "hnsw",
          "engine": "nmslib",
          "parameters": {}
        }
      },
      "file_path": {
        "type": "text"
      },
      "file_text": {
        "type": "text"
      },
      "page_number": {
        "type": "text"
      },
      "metadata": {
        "properties": {
          "filename": {
            "type": "text"
          },
          "entities": {
            "type": "keyword"
          }
        }
      }
    }
  }
}

"""

# We would get an index already exists exception if the index already exists, and that is fine
index_body = json.loads(index_body)
try:
    # Check if the image index exists
    if not img_os_client.indices.exists(img_index_name):
        img_response = img_os_client.indices.create(img_index_name, body=index_body)
        logger.info(f"Response received for the create index for images -> {img_response}")
    else:
        logger.info(f"The image index '{img_index_name}' already exists.")

    # Check if the text index exists
    if not text_os_client.indices.exists(text_index_name):
        txt_response = text_os_client.indices.create(text_index_name, body=index_body)
        logger.info(f"Response received for the create index for texts -> {txt_response}")
    else:
        logger.info(f"The text index '{text_index_name}' already exists.")
except Exception as e:
    logger.error(f"Error in creating index, exception: {e}")


### Check if the the index created has a `knn`/vector field count before the embedding process

In [None]:
try: 
    # Fetch the existing mapping for the text index
    text_mapping = text_os_client.indices.get_mapping(index=text_index_name)
    img_mapping = img_os_client.indices.get_mapping(index=img_index_name)
    text_vector_embedding_mapping = text_mapping[text_index_name]['mappings']['properties'].get('vector_embedding', {})
    img_vector_embedding_mapping = img_mapping[img_index_name]['mappings']['properties'].get('vector_embedding', {})

    if text_vector_embedding_mapping.get('type') == 'knn_vector':
        logger.info(f"The vector_embedding type is found: {text_vector_embedding_mapping.get('type')} -> {text_mapping}")
    else:
        raise ValueError(f"The vector_embedding type is not 'knn_vector', found: {text_vector_embedding_mapping.get('type')}")

    if img_vector_embedding_mapping.get('type') == 'knn_vector':
        logger.info(f"The vector_embedding type is found: {img_vector_embedding_mapping.get('type')} -> {img_mapping}")
    else:
        raise ValueError(f"The vector_embedding type is not 'knn_vector', found: {img_vector_embedding_mapping.get('type')}")
except Exception as e:
    logger.error(f"Error in fetching the index vector field mapping, exception: {e}")

## Step 2. Download the images files from S3 and convert to Base64

Now we download the image files from the S3 bucket into the `local directory`. Once downloaded these files are converted into [Base64](https://en.wikipedia.org/wiki/Base64) encoding so that we can create embeddings from the images.

In [None]:
# download the images from s3 into a local directory to convert into base64 images
os.makedirs(g.LOCAL_IMAGE_DIR, exist_ok=True)
os.makedirs(g.LOCAL_TEXT_DIR, exist_ok=True)

try:
    image_files: List = download_image_files_from_s3(bucket_name, g.BUCKET_IMG_PREFIX, g.LOCAL_IMAGE_DIR, g.IMAGE_FILE_EXTN)
    text_files: List = download_image_files_from_s3(bucket_name, g.BUCKET_TEXT_PREFIX, g.LOCAL_TEXT_DIR, g.TEXT_FILE_EXTN)
    logger.info(f"downloaded {len(image_files) + len(text_files)} files from s3")
except Exception as e:
    logger.error(f"Cannot download the images files from S3 into the local directory: {e}")

#### Convert jpg files fetched from `S3` into `Base64`

In [None]:
def encode_image_to_base64(image_file_path: str) -> str:
    with open(image_file_path, "rb") as image_file:
        b64_image = base64.b64encode(image_file.read()).decode('utf8')
        b64_image_path = os.path.join(g.B64_ENCODED_IMAGES_DIR, f"{Path(image_file_path).stem}.b64")
        with open(b64_image_path, "wb") as b64_image_file:
            b64_image_file.write(bytes(b64_image, 'utf-8'))
    return b64_image_path

## Step 3. Get embeddings for the base64 encoded images

Now we are ready to use Amazon Bedrock via the  Anthropic’s Claude 3 Sonnet foundation model and Amazon Titan Text Embeddings model to convert the base64 version of the images into embeddings. We ingest embeddings into the pipeline using the [requests](https://pypi.org/project/requests/) HTTP library

You must sign all HTTP requests to the pipeline using [Signature Version 4](https://docs.aws.amazon.com/general/latest/gr/signature-version-4.html).

In [None]:
def get_img_desc(image_file_path: str, prompt: str) -> str:
    """
    This function uses a base64 file path of an image, and then uses ClaudeV3 Sonnet to 
    describe the image
    """
    bedrock = boto3.client(service_name="bedrock-runtime", region_name=region, endpoint_url=endpoint_url)
    # read the file, MAX image size supported is 2048 * 2048 pixels
    with open(image_file_path, "rb") as image_file:
        input_image_b64 = image_file.read().decode('utf-8')

    body = json.dumps(
        {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 1000,
            "messages": [
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "image",
                            "source": {
                                "type": "base64",
                                "media_type": "image/jpeg",
                                "data": input_image_b64
                            },
                        },
                        {"type": "text", "text": prompt},
                    ],
                }
            ],
        }
    )

    response = bedrock.invoke_model(
        modelId=claude_model_id,
        body=body
    )

    resp_body = json.loads(response['body'].read().decode("utf-8"))
    resp_text = resp_body['content'][0]['text'].replace('"', "'")
    return resp_text

### Use the image files downloaded from S3, and convert them into `Base64`

In [None]:
os.makedirs(g.B64_ENCODED_IMAGES_DIR, exist_ok=True)
try:
    file_list: List = glob.glob(os.path.join(g.LOCAL_IMAGE_DIR, f"*{g.IMAGE_FILE_EXTN}"))
    logger.info(f"there are {len(file_list)} pdf image files in the {g.IMAGE_DIR} directory for conversion to base64")
except Exception as e:
    logger.error(f"Could not list any {g.IMAGE_FILE_EXTN} files from {g.IMAGE_DIR}: {e}")

# convert each file to base64 and store the base64 in a new file
b64_image_file_list = list(map(encode_image_to_base64, file_list))
logger.info(f"base64 conversion done, there are {len(b64_image_file_list)} base64 encoded files")

### Get Image Descriptions
---

This part of the notebook uses an `image_description_prompt` to describe the images.

In [None]:
# this is the prompt to get the description of each image stored from the pdf file
image_description_prompt_fpath: str = os.path.join(config['dir_info']['prompt_dir'], config['dir_info']['image_description_prompt'])
image_desc_prompt: str = Path(image_description_prompt_fpath).read_text()
print(image_desc_prompt)

### Hybrid Search: Extract `Entities` from the images for further `prefiltering` tasks
---

The purpose of using Hybrid search is to optimize the RAG workflow in retrieving the right image description for specific questions. Some images (full or split in different parts), might not contain the information that is being asked by the question, because of the surrounding embeddings in the vector DB and might fetch the wrong image if it has a similar structure, so Hybrid search helps optimizing that. In this case, we will extract the entities of an image description (including the file name to be precise), then extract the entities of the question being asked, to get the most accurate response possible. `Entities` will help match the question to the correct and most relevant documents in the vector index where the answer can searched for in another sub step.

In [None]:
# prompt is used to extract entities from an image
entity_extraction_prompt_fpath: str = os.path.join(config['dir_info']['prompt_dir'], config['dir_info']['extract_image_entities_template'])
entity_extraction_prompt: str = Path(entity_extraction_prompt_fpath).read_text()
print(entity_extraction_prompt)

### Part 1: Loop through b64 images to 1/get image desc from Claude3, 2/get embedding from Titan text. Call OSI pipeline API to ingest embedding.

In [None]:
def get_img_txt_embeddings(bedrock: botocore.client, prompt_data: str) -> np.ndarray:
    body = json.dumps({
        "inputText": prompt_data,
    })
    try:
        response = bedrock.invoke_model(
            body=body, modelId=config['model_info']['embeddings_model_info'].get('model_id'), 
            accept="application/json", contentType="application/json"
        )
        response_body = json.loads(response['body'].read())
        embedding = response_body.get('embedding')
    except Exception as e:
        logger.error(f"exception={e}")
        embedding = None
    return embedding

In [None]:
# function to get the image description and store the embeddings of that text in the image index
def process_image_data(i: int, 
                       file_path: str, 
                       osi_endpoint, 
                       total: int, 
                       bucket_info: int) -> Dict:
    bedrock = boto3.client(service_name="bedrock-runtime", region_name=region, endpoint_url=endpoint_url)
    json_data: Optional[Dict] = None
    # name of the images that are saved (either split in 4 ways or saved as a single page)
    image_name: Optional[str] = None
    try:
        logger.info(f"going to convert {file_path} into embeddings")
        # first, get the entities from the image to prefilter the image description with the entities
        entities_extracted = get_img_desc(file_path, entity_extraction_prompt)
        # get the image description and prepend the image description with the entities extracted from the image
        content_description = entities_extracted + get_img_desc(file_path, image_desc_prompt)
        print(f"file_path: {file_path}, image description (prefiltered with entities extracted): {content_description}")
        embedding = get_img_txt_embeddings(bedrock, content_description)
        input_image_s3: str = f"s3://{bucket_name}/{bucket_info['img_prefix']}/{Path(file_path).stem}{bucket_info['image_file_extn']}"
        obj_name: str = f"{Path(file_path).stem}{bucket_info['image_file_extn']}"
        # data format for POSTING it to the osi_endpoint
        data = json.dumps([{
            "file_path": input_image_s3,
            "file_text": content_description,
            "page_number": re.search(r"page_(\d+)_?", obj_name).group(1),
            "metadata": {
                "filename": obj_name,
                "entities": entities_extracted
            },
            "vector_embedding": embedding
        }])
        # json data format for local files that are saved
        json_data = {
            "file_type": bucket_info['image_file_extn'],
            "file_name": obj_name,
            "text": content_description,
            "entities": entities_extracted,
            "page_number": re.search(r"page_(\d+)_?", obj_name).group(1)
            }
        # save the information (image description, entities, file type, name, and page number)
        # locally in a json file
        image_dir: str = config['dir_info']['json_img_dir']
        os.makedirs(image_dir, exist_ok=True)
        fpath = os.path.join(image_dir, f"{Path(file_path).stem}.json")
        Path(fpath).write_text(json.dumps(json_data, default=str, indent=2))
        r = requests.request(
            method='POST', 
            url=osi_endpoint, 
            data=data,
            auth=AWSSigV4('osis'))
        logger.info("Ingesting data into pipeline")
        logger.info(f"image desc: {r.text}")
    except Exception as e:
        logger.error(f"Error processing image {file_path}: {e}")
        json_data: Optional[Dict] = None
    return json_data

In [None]:
@ray.remote
def async_process_image_data(i: int, file_path: str, osi_endpoint, total: int, bucket_info: Dict):
    logging.basicConfig(format='[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s', level=logging.INFO)
    logger = logging.getLogger(__name__)
    return process_image_data(i, file_path, osi_endpoint, total, bucket_info)

In [None]:
# count the number of images that throw an error while being saved into the index
erroneous_page_count: int = 0
n: int = config['inference_info']['parallel_inference_count']
image_chunks = [b64_image_file_list[i:i + n] for i in range(0, len(b64_image_file_list), n)]
bucket_info: Dict = {
    'img_prefix': g.BUCKET_IMG_PREFIX,
    'image_file_extn': g.IMAGE_FILE_EXTN
}
for chunk_index, image_chunk in enumerate(image_chunks):
    try:
        st = time.perf_counter()
        logger.info(f"------ getting text description for chunk {chunk_index}/{len(image_chunks)} -----")
        # Iterate over each file path in the chunk and process it individually
        logger.info(f"getting inference for list {chunk_index+1}/{len(image_chunks)}, size of list={len(image_chunk)} ")
        results = ray.get([async_process_image_data.remote(index, file_path, osi_img_endpoint, len(image_chunk), bucket_info) for index, file_path in enumerate(image_chunk)])
        elapsed_time = time.perf_counter() - st
        logger.info(f"------ completed chunk={chunk_index}/{len(image_chunks)} completed in {elapsed_time} ------ ")
    except Exception as e:
        logger.error(f"Error processing chunk {chunk_index}: {e}")
        erroneous_page_count += len(image_chunk)

logger.info(f"Number of erroneous pdf pages that are not processed: {erroneous_page_count}")

### Part 2: Loop through text files to 1/get embedding from Titan text, 2/extract the text entities using `nltk`. Call OSI pipeline API to ingest embedding.

In [None]:
# Get a list of all text files 
pdf_txt_file_list = os.listdir(g.LOCAL_TEXT_DIR)

# Get absolute file paths by joining the directory path with each file name
pdf_txt_file_list = [os.path.abspath(os.path.join(g.LOCAL_TEXT_DIR, file)) for file in pdf_txt_file_list]
logger.info(f"Number of text files from the PDF local directory to process: {len(pdf_txt_file_list)}")

#### Entities extraction from PDF texts using [NLTK]('https://www.nltk.org/')
---

NLTK is a leading platform for building Python programs to work with human language data. We use `NLTK` to extract entities from the text files that are extracted from each `PDF page`, and use that as a prepend onto the extracted file to be sent to the `OSI endpoint`.

In [None]:
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')
nltk.download('maxent_ne_chunker') 
nltk.download('words')
nltk.download('punkt_tab')
nltk.download('averaged_perceptron_tagger_eng')
nltk.download('maxent_ne_chunker_tab')

In [None]:
def get_continuous_chunks(text):
    """
    This function uses nltk to get the entities from texts that are extracted from pdf files
    """
    chunked = ne_chunk(pos_tag(word_tokenize(text)))
    continuous_chunk = []
    current_chunk = []
    for i in chunked:
        if type(i) == Tree:
            current_chunk.append(" ".join([token for token, pos in i.leaves()]))
        if current_chunk:
            named_entity = " ".join(current_chunk)
            if named_entity not in continuous_chunk:
                continuous_chunk.append(named_entity)
                current_chunk = []
        else:
            continue
    return continuous_chunk

In [None]:
def process_text_data(txt_file: str, txt_page_index: int):
    with open(txt_file, 'r') as file:
        extracted_pdf_text = file.read()
    # Extract entities from text using nltk
    entities = get_continuous_chunks(extracted_pdf_text)
    # Convert the entities list to string 
    entities_str = ", ".join(entities)
    logger.info(f"entities extracted from {txt_file}: {entities_str}")
    embedding = get_text_embedding(bedrock, extracted_pdf_text)
    input_text_s3 = f"s3://{bucket_name}/{g.BUCKET_TEXT_PREFIX}/{Path(txt_file).stem}{g.TEXT_FILE_EXTN}"
    obj_name = f"{Path(txt_file).stem}{g.TEXT_FILE_EXTN}"
    # data format that is used to POST to the osi endpoint
    data = json.dumps([{
        "file_path": input_text_s3,
        "file_text": extracted_pdf_text,
        "page_number": txt_page_index,
        "metadata": {
            "filename": obj_name,
            "entities": entities_str
        },
        "vector_embedding": embedding
    }])
    # json data format that is saved in a local directory
    json_data = {
        "file_type": g.TEXT_FILE_EXTN,
        "file_name": Path(txt_file).stem,
        "text": extracted_pdf_text, 
        "page_number": re.search(r"text_(\d+)_?", obj_name).group(1),
        "entities": entities_str  
    } 
    os.makedirs(config['dir_info']['json_txt_dir'], exist_ok=True)
    fpath = os.path.join(config['dir_info']['json_txt_dir'], f"{Path(txt_file).stem}.json")
    print(f"json_file_path: {fpath}")
    Path(fpath).write_text(json.dumps(json_data, default=str, indent=2))
    r = requests.request(
        method='POST',
        url=osi_text_endpoint,
        data=data,
        auth=AWSSigV4('osis'))

    logger.info("Ingesting data into pipeline")
    logger.info(f"Response: {txt_page_index} - {r.text}")

In [None]:
txt_page_index: int = 1
os.makedirs(config['dir_info']['json_txt_dir'], exist_ok=True)
for txt_file in pdf_txt_file_list:
    logger.info(f"going to convert {txt_file} into embeddings")
    process_text_data(txt_file, txt_page_index)
    txt_page_index += 1