In [None]:
# Create Bucket

aws s3 mb s3://bedrock-kb-pdfs-$(aws sts get-caller-identity --query Account --output text)-eu-north-1 --region eu-north-1

In [None]:
# Upload PDF Files

aws s3 cp ./pdfs/ s3://bedrock-kb-pdfs-123456789012-eu-north-1/pdfs/ --recursive --region eu-north-1

In [None]:
import boto3
import os
import logging
import sys
from botocore.exceptions import ClientError
from dotenv import load_dotenv

# Load environment variables from .env
load_dotenv()

# Configure logging to stdout
logging.basicConfig(
    level=logging.INFO,
    stream=sys.stdout,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Initialize boto3 clients
session = boto3.Session(region_name=os.getenv("AWS_REGION"))
bedrock = session.client("bedrock")
bedrock_agent = session.client("bedrock-agent")

def create_knowledge_base(bucket_name, role_arn):
    try:
        # Create Knowledge Base
        kb_response = bedrock_agent.create_knowledge_base(
            name="pdf-knowledge-base",
            description="Knowledge base for PDF files",
            roleArn=role_arn,
            knowledgeBaseConfiguration={
                "type": "VECTOR",
                "vectorKnowledgeBaseConfiguration": {
                    "embeddingModelArn": f"arn:aws:bedrock:{os.getenv('AWS_REGION')}::foundation-model/amazon.titan-embed-text-v2:0"
                }
            },
            storageConfiguration={
                "type": "OPENSEARCH_SERVERLESS",
                "opensearchServerlessConfiguration": {
                    "collectionArn": "",  # Will be created automatically
                    "vectorIndexName": "pdf-vector-index",
                    "fieldMapping": {
                        "vectorField": "embedding",
                        "textField": "text",
                        "metadataField": "metadata"
                    }
                }
            }
        )
        kb_id = kb_response["knowledgeBase"]["knowledgeBaseId"]
        logger.info(f"Created Knowledge Base with ID: {kb_id}")

        # Create Data Source
        ds_response = bedrock_agent.create_data_source(
            knowledgeBaseId=kb_id,
            name="pdf-data-source",
            description="S3 bucket containing PDF files",
            dataSourceConfiguration={
                "type": "S3",
                "s3Configuration": {
                    "bucketArn": f"arn:aws:s3:::{bucket_name}",
                    "inclusionPrefixes": ["pdfs/"]
                }
            }
        )
        ds_id = ds_response["dataSource"]["dataSourceId"]
        logger.info(f"Created Data Source with ID: {ds_id}")

        # Start Ingestion Job
        ingestion_response = bedrock_agent.start_ingestion_job(
            knowledgeBaseId=kb_id,
            dataSourceId=ds_id
        )
        job_id = ingestion_response["ingestionJob"]["ingestionJobId"]
        logger.info(f"Started Ingestion Job with ID: {job_id}")

        return kb_id, ds_id, job_id

    except ClientError as e:
        logger.error(f"Error creating Knowledge Base: {e}")
        raise

def check_ingestion_status(kb_id, ds_id, job_id):
    try:
        status_response = bedrock_agent.get_ingestion_job(
            knowledgeBaseId=kb_id,
            dataSourceId=ds_id,
            ingestionJobId=job_id
        )
        status = status_response["ingestionJob"]["status"]
        logger.info(f"Ingestion Job Status: {status}")
        return status
    except ClientError as e:
        logger.error(f"Error checking ingestion status: {e}")
        raise

if __name__ == "__main__":
    # Environment variables
    bucket_name = "bedrock-kb-pdfs-123456789012-eu-north-1"  # Replace with your bucket name
    role_arn = "arn:aws:iam::311410995876:role/SageMaker-MLengineer"  # Replace with your role ARN
    os.environ["AWS_REGION"] = "eu-north-1"

    # Create Knowledge Base
    kb_id, ds_id, job_id = create_knowledge_base(bucket_name, role_arn)

    # Check ingestion status
    import time
    while True:
        status = check_ingestion_status(kb_id, ds_id, job_id)
        if status in ["COMPLETE", "FAILED", "STOPPED"]:
            break
        time.sleep(30)