# Semantic chunking

In this module, you'll learn:
1. [What is semantic chunking](#31-what-is-semantic-chunking)
2. [How to use semantic chunking in LangChain to pre-process documents](#32-implementation-using-langchain)
3. [Put the pre-processed files to Amazon S3 and ingest into Knowledge Bases for Amazon Bedrock](#33-ingest-files-to-the-knowledge-base)
4. [Test the knowledge base](#34-test-the-knowledge-base)

In [None]:
%pip install -r requirements.txt

## 3.1 What is semantic chunking?
Semantic chunking is an experimental method which splits text based on their semantic similarity – similar sentences stay in the same chunk. Briefly speaking, the method performs the following:
1. Create embeddings for (group of) sentences
2. Compare the similarity between adjacent groups
3. Set a similarity threshold
4. Join the adjacent groups which have similarity above the threshold

### 3.1.1 Data ingestion flow
![semantic chunking with Knowledge Bases for Amazon Bedrock](images/semantic-chunking.png)

Reminder: Use the same embedding model across the whole data flow, including
* Document chunking (using open source library)
* Document embedding (managed by Knowledge Bases for Amazon Bedrock)
* Document retrieval (managed by Knowledge Bases for Amazon Bedrock)

In this workshop, you'll use **Amazon Titan Embeddings G1 - Text**

## 3.2 Implementation using LangChain
Reference: https://python.langchain.com/docs/modules/data_connection/document_transformers/semantic-chunker/

### 3.2.1 Read sample data
In this section you'll use the Amazon 2023 letter to shareholders as sample data. Feel free to use your own PDF file by changing the `file_url`.

In [None]:
# Read the PDF file content
import requests
from io import BytesIO
from pypdf import PdfReader

file_url = 'https://s2.q4cdn.com/299287126/files/doc_financials/2024/ar/Amazon-com-Inc-2023-Shareholder-Letter.pdf'

file = requests.get(file_url)
file_io = BytesIO(file.content)
reader = PdfReader(file_io)

In [None]:
# Extract text from the whole doc
# Perform semantic split later on
num_pages = len(reader.pages)
text = ""
for i in range(num_pages):
    text += reader.pages[i].extract_text() + " "

In [None]:
# Explore the text
# len(text)
# print(text)

### 3.2.2 Perform semantic chunking

In [None]:
# Load AWS credentials
%load_ext dotenv
%dotenv

In [None]:
# Define embedding model
from langchain_aws import BedrockEmbeddings

model_id = 'amazon.titan-embed-text-v1'
embeddings = BedrockEmbeddings(model_id=model_id)

In [None]:
# Split text semantically
from langchain_experimental.text_splitter import SemanticChunker 

breakpoint_percentile = 95  # type: percentile
# breakpoint_sd = 3  # type: standard_deviation
# breakpoint_iqr = 1.5  # type: interquartile

semantic_chunker = SemanticChunker(
    embeddings,
    breakpoint_threshold_type = 'percentile',
    breakpoint_threshold_amount = breakpoint_percentile
)
split_texts = semantic_chunker.split_text(text)

You now have 17 chunks from the 11-page document.

In [None]:
# Explore thd splits
for split_text in split_texts:
    print(len(split_text))

Split 5 is too short (11 characters). Let's see what's in it.

In [None]:
print(split_texts[4])
print('---')
print(split_texts[5])
print('---')
print(split_texts[6])

Feel free to explore different threshold types and values. See if you can create better splits.

### 3.3 Ingest files to the knowledge base

This section assumes that you've the following resources ready:
* A S3 bucket to store the split text files
* An Amazon Bedrock knowledge base

### 3.3.1 Put split files to Amazon S3

In [None]:
import boto3

# Target S3 bucket and prefix
s3_bucket = '<your bucket name>'  # replace this with your S3 bucket name
s3_prefix = 'semantic_chunk_workshop'

s3_client = boto3.client('s3')

# Write each split text to a separate file in S3
for i, split_text in enumerate(split_texts):
    s3_key = f'{s3_prefix}/{i}.txt'
    print(f'Writing to s3://{s3_bucket}/{s3_key}')
    s3_client.put_object(Body=split_text, Bucket=s3_bucket, Key=s3_key)

### 3.3.2 Synchronize documents to Knowledge Bases for Amazon Bedrock

#### 3.3.2.1 Using AWS Console
1. Go to the [Knowledge Bases for Amazon Bedrock console](https://console.aws.amazon.com/bedrock/home#/knowledge-bases)
2. Select your knowledge base, and click **Edit**
3. In the **Data source** section, click **Add**
4. Change the following and click **Add**
    1. S3 URI: s3://\<your bucket name\>/semantic_chunk_workshop/
    2. Expand **Advanced settings**, and select **No chunking**
    ![Disable chunking in data source](img/semantic-chunking-data-source-config.png)
5. Once completed, click **Sync** to ingest data from S3 into the knowledge base
6. Check the **Sync history** and you'll see that 17 source files added (or other numbers if you use a different data or chunking settings)

#### 3.3.2.2 Using boto3
If you prefer to create a data source programmatically, run the following cells. Otherwise, skip to the next section.

In [None]:
import boto3

# Target knowledge base
kb_id = '<knowledge base id>'  # Replace with your knowledge base ID
bedrock_agent_client = boto3.client('bedrock-agent')

In [None]:
# Create data source
data_source_config = {
    "s3Configuration": {
        "bucketArn": f"arn:aws:s3:::{s3_bucket}",
        "inclusionPrefixes": [f"{s3_prefix}/"]},
    "type": "S3"
}
description = 'Data source for semantic chunking workshop'
name = 'semantic-chunking-workshop'
vectorIngestionConfiguration = {
    'chunkingConfiguration': {'chunkingStrategy': 'NONE'}
}

response = bedrock_agent_client.create_data_source(
    dataSourceConfiguration=data_source_config,
    description=description,
    knowledgeBaseId=kb_id,
    name=name,
    vectorIngestionConfiguration=vectorIngestionConfiguration
)

ds_id = response.get("dataSource").get("dataSourceId")  # Data source ID

In [None]:
# Synchronize documents to knowledge base
response = bedrock_agent_client.start_ingestion_job(
    dataSourceId=ds_id,
    knowledgeBaseId=kb_id,
)

job_id = response.get("ingestionJob").get("ingestionJobId")

In [None]:
# Loop until the job has completed or failed
import time

for i in range(10):
    time.sleep(30)  # Wait for 30 seconds before checking the status of the job
    response = bedrock_agent_client.get_ingestion_job(
        dataSourceId=ds_id,
        ingestionJobId=job_id,
        knowledgeBaseId=kb_id,
    )
    status = response.get("ingestionJob").get("status")
    if status != 'IN_PROGRESS':
        print(f'Ingestion Job {job_id} is {status}')
        break

## 3.4 Test the knowledge base

### 3.4.1 Using AWS Console
1. Go to the [Knowledge Bases for Amazon Bedrock console](https://console.aws.amazon.com/bedrock/home#/knowledge-bases)
2. Select your knowledge base, and click **Test knowledge base**
3. Make sure **Generate responses** is enabled
4. Click **Select model**
5. Choose **Claude 3 Haiku** or any other models and click **Apply**
6. Enter your question and click **Run**
7. Check the response, expand **source details** to check the references

For example:
![Sample response with semantic chunking](img/semantic-chunking-test-retrieve-generate.png)

### 3.4.2 Using boto3

In [None]:
import boto3

# Target knowledge base
kb_id = '<knowledge base id>'  # Replace with your knowledge base ID
bedrock_agent_runtime_client = boto3.client('bedrock-agent-runtime')

In [None]:
# Retrieve documents and generate response
region = boto3.Session().region_name
model_arn = f'arn:aws:bedrock:{region}::foundation-model/anthropic.claude-3-haiku-20240307-v1:0'

# Helper function
def ask(question, session_id=None):
    # Construct the config
    config = {
        "input": {
            "text": question
        },
        "retrieveAndGenerateConfiguration": {
            "knowledgeBaseConfiguration": {
                "knowledgeBaseId": kb_id,
                "modelArn": model_arn,
            },
            "type": "KNOWLEDGE_BASE"
        }
    }
    # Add session_id if exists
    if session_id:
        config["sessionId"] = session_id
    # Invoke the knowledge base API
    response = bedrock_agent_runtime_client.retrieve_and_generate(
        **config
    )
    session_id = response.get("sessionId")
    output = response.get("output")
    citations = response.get("citations")
    return session_id, output, citations

In [None]:
# Ask the first question in a new session
question = 'What is project Kuiper?'
session_id, output, citations = ask(question)
print(output.get('text'))
# print(citations)
# print(session_id)

In [None]:
# Ask a follow-up question
question = 'Tell me more about it.'
session_id, output, citations = ask(question, session_id)
print(output.get('text'))
# print(citations)
# print(session_id)