In [89]:
import boto3
from botocore.config import Config

session = boto3.Session(profile_name="Caleb")

sts = session.client("sts")
identity = sts.get_caller_identity()

print(identity)

{'UserId': 'AROA5BZFIZOADUUEBUTW6:caleb.chan@nxp.com', 'Account': '897189464960', 'Arn': 'arn:aws:sts::897189464960:assumed-role/AWSReservedSSO_ccoe-powerusers_6dbe169efbadd99d/caleb.chan@nxp.com', 'ResponseMetadata': {'RequestId': '462ed97b-5931-4d24-8629-ac0d8c21ee2f', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '462ed97b-5931-4d24-8629-ac0d8c21ee2f', 'x-amz-sts-extended-request-id': 'MTp1cy1lYXN0LTE6MTc1NDQ3MDgxNzYxNTpHOnd6bVNQbU5z', 'content-type': 'text/xml', 'content-length': '490', 'date': 'Wed, 06 Aug 2025 09:00:17 GMT'}, 'RetryAttempts': 0}}


In [90]:
import os

# aws sso login --profile Caleb
# aws sts get-caller-identity --profile Caleb

# Get current profile from environment
profile = os.environ.get("AWS_PROFILE", "Caleb")
print("Current AWS Profile:", profile)

session = boto3.Session(profile_name=profile)

# Verify identity
sts = session.client("sts")
identity = sts.get_caller_identity()
print("AWS Identity:", identity)

Current AWS Profile: Caleb
AWS Identity: {'UserId': 'AROA5BZFIZOADUUEBUTW6:caleb.chan@nxp.com', 'Account': '897189464960', 'Arn': 'arn:aws:sts::897189464960:assumed-role/AWSReservedSSO_ccoe-powerusers_6dbe169efbadd99d/caleb.chan@nxp.com', 'ResponseMetadata': {'RequestId': 'b6199c1c-e6c0-4a58-9edb-b95bcdcf2b28', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'b6199c1c-e6c0-4a58-9edb-b95bcdcf2b28', 'x-amz-sts-extended-request-id': 'MTp1cy1lYXN0LTE6MTc1NDQ3MDgyMjYwNTpHOkF1WldlSVB4', 'content-type': 'text/xml', 'content-length': '490', 'date': 'Wed, 06 Aug 2025 09:00:22 GMT'}, 'RetryAttempts': 0}}


In [75]:
import boto3
import json

# Your SSO profile name
SSO_PROFILE = 'Caleb'
ROLE_ARN = 'arn:aws:iam::897189464960:role/rag'

def test_sso_bedrock():
    """Test the complete SSO -> Role -> Bedrock flow"""
    
    print("🚀 Testing SSO -> Role -> Bedrock Flow")
    print("=" * 45)
    
    try:
        # Step 1: Test SSO profile
        print(f"1. Testing SSO profile: {SSO_PROFILE}")
        session = boto3.Session(profile_name=SSO_PROFILE)
        sts = session.client('sts')
        
        identity = sts.get_caller_identity()
        print(f"   ✅ SSO profile works!")
        print(f"   Identity: {identity['Arn']}")
        print(f"   Account: {identity['Account']}")
        
    except Exception as e:
        print(f"   ❌ SSO profile failed: {e}")
        print(f"   💡 Try: aws sso login --profile {SSO_PROFILE}")
        return
    
    try:
        # Step 2: Assume the role
        print(f"\n2. Assuming role: {ROLE_ARN}")
        response = sts.assume_role(
            RoleArn=ROLE_ARN,
            RoleSessionName='bedrock-session',
            DurationSeconds=3600
        )
        
        credentials = response['Credentials']
        print("   ✅ Role assumption successful!")
        
    except Exception as e:
        print(f"   ❌ Role assumption failed: {e}")
        print("   💡 Check role trust policy - it needs to allow your SSO role")
        return
    
    try:
        # Step 3: Test Bedrock
        print("\n3. Testing Bedrock access...")
        client = boto3.client(
            'bedrock-runtime',
            region_name='us-east-1',  # Try us-east-1 first
            aws_access_key_id=credentials['AccessKeyId'],
            aws_secret_access_key=credentials['SecretAccessKey'],
            aws_session_token=credentials['SessionToken']
        )
        
        request_body = {
            "anthropic_version": "bedrock-2023-05-25",
            "max_tokens": 100,
            "messages": [
                {
                    "role": "user",
                    "content": "Hello! Please confirm you're working through AWS Bedrock."
                }
            ]
        }
        
        response = client.invoke_model(
            modelId='anthropic.claude-3-5-sonnet-20240620-v1:0',
            contentType='application/json',
            accept='application/json',
            body=json.dumps(request_body)
        )
        
        response_body = json.loads(response['body'].read().decode('utf-8'))
        print("   ✅ Bedrock works!")
        print(f"   Claude says: {response_body['content'][0]['text']}")
        
        print("\n🎉 SUCCESS! Everything is working!")
        return client
        
    except Exception as e:
        print(f"   ❌ Bedrock failed: {e}")
        
        # Try to give specific guidance
        if 'ValidationException' in str(e):
            print("   💡 Request access to Claude models in Bedrock console")
        elif 'AccessDenied' in str(e):
            print("   💡 Add Bedrock permissions to your role")
        else:
            print("   💡 Check region availability (try us-east-1)")
        
        return None

def create_working_client():
    """Create a working Bedrock client for future use"""
    
    print("\n" + "="*45)
    print("📋 Creating reusable Bedrock client...")
    
    try:
        # Assume role
        session = boto3.Session(profile_name=SSO_PROFILE)
        sts = session.client('sts')
        
        response = sts.assume_role(
            RoleArn=ROLE_ARN,
            RoleSessionName='bedrock-work-session',
            DurationSeconds=3600
        )
        
        credentials = response['Credentials']
        
        # Create client
        client = boto3.client(
            'bedrock-runtime',
            region_name='us-east-1',
            aws_access_key_id=credentials['AccessKeyId'],
            aws_secret_access_key=credentials['SecretAccessKey'],
            aws_session_token=credentials['SessionToken']
        )
        
        print("✅ Client created! Use this for your Bedrock calls:")
        print("""
# Your working client code:
import boto3
import json

def get_bedrock_client():
    session = boto3.Session(profile_name='ccoe-powerusers-382209588990')
    sts = session.client('sts')
    
    response = sts.assume_role(
        RoleArn='arn:aws:iam::897189464960:role/rag',
        RoleSessionName='bedrock-session',
        DurationSeconds=3600
    )
    
    credentials = response['Credentials']
    
    return boto3.client(
        'bedrock-runtime',
        region_name='us-east-1',
        aws_access_key_id=credentials['AccessKeyId'],
        aws_secret_access_key=credentials['SecretAccessKey'],
        aws_session_token=credentials['SessionToken']
    )

# Usage:
client = get_bedrock_client()
# ... make your Bedrock calls with client
        """)
        
        return client
        
    except Exception as e:
        print(f"❌ Failed to create client: {e}")
        return None

if __name__ == "__main__":
    # Test the flow
    client = test_sso_bedrock()
    
    if client:
        # Provide reusable code
        create_working_client()

🚀 Testing SSO -> Role -> Bedrock Flow
1. Testing SSO profile: Caleb
   ✅ SSO profile works!
   Identity: arn:aws:sts::897189464960:assumed-role/AWSReservedSSO_ccoe-powerusers_6dbe169efbadd99d/caleb.chan@nxp.com
   Account: 897189464960

2. Assuming role: arn:aws:iam::897189464960:role/rag
   ✅ Role assumption successful!

3. Testing Bedrock access...
   ❌ Bedrock failed: An error occurred (AccessDeniedException) when calling the InvokeModel operation: You don't have access to the model with the specified model ID.
   💡 Add Bedrock permissions to your role


## Model Invocation (GPT)

In [102]:
import boto3
import json

session = boto3.Session(profile_name="Caleb")
bedrock = session.client("bedrock-runtime", region_name="us-west-2")

response = bedrock.invoke_model(
    modelId="openai.gpt-oss-120b-1:0",
    contentType="application/json",
    accept="application/json",
    body=json.dumps({
        "messages": [
            {
                "role": "user",
                "content": "Hello, tell me about youself!"
            }
        ],
        "temperature": 0.3,
        "top_p": 0.9,
        "max_completion_tokens": 1024
    })
)

response_body = json.loads(response['body'].read())
print(response_body)

{'id': 'chatcmpl-f6ac42e5', 'choices': [{'finish_reason': 'stop', 'index': 0, 'message': {'content': '<reasoning>We need to respond as ChatGPT with a short self-introduction, following the style guidelines: short, concise, friendly, no emojis, no excessive detail. Should be about 2-3 sentences.</reasoning>I’m ChatGPT, an AI language model created by OpenAI. I can help answer questions, brainstorm ideas, and assist with a wide range of topics—just let me know what you need!', 'role': 'assistant'}}], 'created': 1754471209, 'model': 'openai.gpt-oss-120b-1:0', 'service_tier': 'standard', 'system_fingerprint': 'fp_bc33001493', 'object': 'chat.completion', 'usage': {'completion_tokens': 96, 'prompt_tokens': 18, 'total_tokens': 114}}


In [100]:
assistant_message = response_body['choices'][0]['message']['content']
print("Assistant's Response:")
print(assistant_message)

print("\n" + "="*50 + "\n")

print(f"Model used: {response_body['model']}")
print(f"Finish reason: {response_body['choices'][0]['finish_reason']}")
print(f"Total tokens used: {response_body['usage']['total_tokens']}")
print(f"Input tokens: {response_body['usage']['prompt_tokens']}")
print(f"Output tokens: {response_body['usage']['completion_tokens']}")

Assistant's Response:
<reasoning>The user says: "Hello, tell me about yourself!" They want a self-introduction. According to policy, we can comply. It's a simple request. So we can give a brief self-introduction. Should be short. The user wants a short self-introduction. So we respond with a short self-introduction.</reasoning>Sure! I’m ChatGPT, an AI language model created by OpenAI. I’m designed to understand and generate human‑like text, answer questions, help with writing, brainstorm ideas, and chat about a wide range of topics. I don’t have personal experiences or feelings, but I’m here to assist you with information, creativity, and problem‑solving whenever you need it.


Model used: openai.gpt-oss-120b-1:0
Finish reason: stop
Total tokens used: 187
Input tokens: 18
Output tokens: 169


## Model Invocation (Claude)

In [244]:
import boto3

session = boto3.Session(profile_name="Caleb") 
client = session.client("bedrock-runtime", region_name="us-west-2")

response = client.invoke_model_with_response_stream(
    #modelId='arn:aws:bedrock:eu-west-1:897189464960:inference-profile/eu.anthropic.claude-3-5-sonnet-20240620-v1:0',
    modelId='arn:aws:bedrock:us-west-2:897189464960:inference-profile/us.anthropic.claude-sonnet-4-20250514-v1:0',
    contentType='application/json',
    accept='application/json',
    body=json.dumps({
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 1000,
        "messages": [
            {
                "role": "user",
                "content": """"
                    Hello
                """
            }
        ]
    })
)

for event in response['body']:
    try:
        chunk = json.loads(event['chunk']['bytes'].decode())
        if 'delta' in chunk:
            try:
                print(chunk['delta']['text'], end='', flush=True)
            except KeyError:
                pass  # skip chunks without 'text'
    except Exception as e:
        print(f"Error parsing chunk: {e}")

    
#print(response['body'].read().decode())

Hello! How can I help you today?

In [69]:
# sentence transformer embeddings
from sentence_transformers import SentenceTransformer, util
import numpy as np
import re
import seaborn as sns
import matplotlib.pyplot as plt

similarity_model = SentenceTransformer('all-MiniLM-L6-v2')

texts = ['hello','i am snowy']
embeddings = similarity_model.encode(texts, convert_to_tensor=True)

# Custom RAG

In [135]:
import json
import os
import sys
import boto3

from langchain_community.embeddings import BedrockEmbeddings
from langchain.llms.bedrock import Bedrock

import numpy as np
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFDirectoryLoader

from langchain.vectorstores import FAISS

from langchain.prompts import PromptTemplate
from langchain.chains import RetrievalQA

session = boto3.Session(profile_name="Caleb")
bedrock = session.client("bedrock-runtime", region_name="us-west-2")
bedrock_embeddings = BedrockEmbeddings(model_id="amazon.titan-embed-text-v2:0",client=bedrock)

## Document Chunking

### pdf; by character

In [126]:
def data_ingestion():
    loader=PyPDFDirectoryLoader("data")
    documents=loader.load()

    # - in our testing Character split works better with this PDF data set
    text_splitter=RecursiveCharacterTextSplitter(chunk_size=10000,
                                                 chunk_overlap=1000)
    
    docs=text_splitter.split_documents(documents)
    return docs

In [127]:
# Usage example
if __name__ == "__main__":
    # Make sure you have pandas installed: pip install pandas openpyxl
    docs = data_ingestion()
    print(f"Loaded and split {len(docs)} document chunks")
    
    # Print first chunk as example
    if docs:
        print("\nFirst chunk preview:")
        print(docs[0].page_content[:500] + "...")
        print(f"Metadata: {docs[0].metadata}")

Loaded and split 4 document chunks

First chunk preview:
#include "Testplan.h"
//-----------------------------------------------------------------------------
//
// Task: Func
//
// Description:
// 
// Revision history:
//
//-----------------------------------------------------------------------------
TASK_FUNCTION Func ()
{
  DWORD TestNumber;
   double VDD;
  
  BEGIN_TASK (FUNC_ID)
    // Update the die list
    GetDieList (gDieList);
    //Connect grounds to Abus ground for all tests
      GndConnectAbus    (ROW4);
      PinConnectAbus    (pinVSS,...
Metadata: {'producer': '', 'creator': '', 'creationdate': '2025-08-06T03:03:30-07:00', 'title': '', 'author': 'Perapach Nimkuntod', 'subject': '', 'keywords': '', 'moddate': '2025-08-06T03:03:30-07:00', 'source': 'data/PCA9546ABS_TP Comparison.pdf', 'total_pages': 4, 'page': 0, 'page_label': '1'}


### csv; by character

In [117]:
import pandas as pd
import os
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document

def data_ingestion():
    """Load and split Excel files from the data directory"""
    documents = []
    
    # Get all Excel files from the data directory
    data_dir = "data"
    excel_extensions = ['.xlsx', '.xls', '.csv']
    
    for filename in os.listdir(data_dir):
        if any(filename.lower().endswith(ext) for ext in excel_extensions):
            file_path = os.path.join(data_dir, filename)
            
            try:
                # Read Excel file (handles both .xlsx and .xls)
                if filename.lower().endswith('.csv'):
                    df = pd.read_csv(file_path)
                else:
                    df = pd.read_excel(file_path, sheet_name=None)  # Read all sheets
                
                # Process multiple sheets if it's an Excel file
                if isinstance(df, dict):  # Multiple sheets
                    for sheet_name, sheet_df in df.items():
                        content = convert_df_to_text(sheet_df, sheet_name, filename)
                        if content.strip():  # Only add if content is not empty
                            doc = Document(
                                page_content=content,
                                metadata={
                                    "source": filename,
                                    "sheet": sheet_name,
                                    "type": "excel"
                                }
                            )
                            documents.append(doc)
                else:  # Single sheet or CSV
                    content = convert_df_to_text(df, "Sheet1", filename)
                    if content.strip():
                        doc = Document(
                            page_content=content,
                            metadata={
                                "source": filename,
                                "sheet": "Sheet1" if not filename.endswith('.csv') else "CSV",
                                "type": "excel" if not filename.endswith('.csv') else "csv"
                            }
                        )
                        documents.append(doc)
                        
            except Exception as e:
                print(f"Error reading {filename}: {str(e)}")
                continue
    
    # Split documents using RecursiveCharacterTextSplitter
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=10000,
        chunk_overlap=1000
    )
    docs = text_splitter.split_documents(documents)
    
    return docs

def convert_df_to_text(df, sheet_name, filename):
    """Convert DataFrame to text format suitable for processing"""
    if df.empty:
        return ""
    
    # Create a structured text representation
    text_parts = [
        f"File: {filename}",
        f"Sheet: {sheet_name}",
        f"Rows: {len(df)}, Columns: {len(df.columns)}",
        "-" * 50
    ]
    
    # Add column headers
    text_parts.append("Columns: " + ", ".join(df.columns.astype(str)))
    text_parts.append("-" * 50)
    
    # Convert each row to text
    for idx, row in df.iterrows():
        row_text = []
        for col, value in row.items():
            if pd.notna(value):  # Only include non-null values
                row_text.append(f"{col}: {value}")
        
        if row_text:  # Only add row if it has content
            text_parts.append(f"Row {idx + 1}: {' | '.join(row_text)}")
    
    return "\n".join(text_parts)

Loaded and split 4 document chunks

First chunk preview:
File: PCA9546ABS_TP Comparison(FUNC).csv
Sheet: Sheet1
Rows: 282, Columns: 13
--------------------------------------------------
Columns: #include "Testplan.h", Unnamed: 1, Unnamed: 2, Unnamed: 3, Unnamed: 4, Unnamed: 5, Unnamed: 6, Unnamed: 7, Unnamed: 8, Unnamed: 9, Unnamed: 10, Unnamed: 11, Unnamed: 12
--------------------------------------------------
Row 2: #include "Testplan.h": //-----------------------------------------------------------------------------
Row 3: #include "Testplan.h": //...
Metadata: {'source': 'PCA9546ABS_TP Comparison(FUNC).csv', 'sheet': 'CSV', 'type': 'csv'}


### csv; by delimiter string

In [128]:
from langchain.schema import Document

def data_ingestion():
    """Split Excel data by 'Test Number' delimiter"""
    documents = []
    data_dir = "data"
    
    for filename in os.listdir(data_dir):
        if filename.lower().endswith(('.xlsx', '.xls', '.csv')):
            file_path = os.path.join(data_dir, filename)
            
            try:
                # Read Excel/CSV file
                if filename.lower().endswith('.csv'):
                    sheets = {"CSV": pd.read_csv(file_path)}
                else:
                    sheets = pd.read_excel(file_path, sheet_name=None)
                
                for sheet_name, df in sheets.items():
                    if df.empty:
                        continue
                    
                    # Split by 'Test Number' delimiter
                    test_chunks = split_by_test_number(df, sheet_name, filename)
                    documents.extend(test_chunks)
                    
            except Exception as e:
                print(f"Error processing {filename}: {e}")
                continue
    
    return documents

def split_by_test_number(df, sheet_name, filename):
    """Split DataFrame by rows containing 'Test Number', including initial content"""
    chunks = []
    
    # Find all rows that contain 'Test Number' in any column
    test_number_rows = []
    
    for idx, row in df.iterrows():
        # Check if any cell in the row contains 'Test Number'
        row_contains_test = any(
            str(cell).strip().startswith('TestNumber') 
            for cell in row.values 
            if pd.notna(cell)
        )
        
        if row_contains_test:
            test_number_rows.append(idx)
    
    # If no 'Test Number' found, treat entire sheet as one chunk
    if not test_number_rows:
        content = convert_entire_sheet_to_text(df, sheet_name, filename)
        chunk = Document(
            page_content=content,
            metadata={
                "source": filename,
                "sheet": sheet_name,
                "test_number": "No Test Number Found",
                "chunk_type": "full_sheet",
                "start_row": 1,
                "end_row": len(df)
            }
        )
        return [chunk]
    
    # Handle content BEFORE first Test Number (if any)
    if test_number_rows[0] > 0:
        # There's content before the first Test Number
        initial_chunk_df = df.iloc[0:test_number_rows[0]]
        
        content = convert_test_chunk_to_text(
            initial_chunk_df, sheet_name, filename, 
            "Header/Preamble", 1, test_number_rows[0]
        )
        
        chunk = Document(
            page_content=content,
            metadata={
                "source": filename,
                "sheet": sheet_name,
                "test_number": "Header/Preamble",
                "chunk_type": "initial_content",
                "start_row": 1,
                "end_row": test_number_rows[0],
                "total_rows": len(initial_chunk_df)
            }
        )
        chunks.append(chunk)
    
    # Create chunks between Test Number markers
    for i, start_row in enumerate(test_number_rows):
        # Determine end row (next Test Number or end of sheet)
        if i < len(test_number_rows) - 1:
            end_row = test_number_rows[i + 1]
        else:
            end_row = len(df)
        
        # Extract chunk data
        chunk_df = df.iloc[start_row:end_row]
        
        # Get test number from the first row
        test_number = extract_test_number(chunk_df.iloc[0])
        
        # Convert chunk to text
        content = convert_test_chunk_to_text(chunk_df, sheet_name, filename, test_number, start_row + 1, end_row)
        
        chunk = Document(
            page_content=content,
            metadata={
                "source": filename,
                "sheet": sheet_name,
                "test_number": test_number,
                "chunk_type": "test_based",
                "start_row": start_row + 1,
                "end_row": end_row,
                "total_rows": len(chunk_df)
            }
        )
        chunks.append(chunk)
    
    return chunks

def extract_test_number(first_row):
    """Extract the test number from the first row"""
    for cell in first_row.values:
        if pd.notna(cell) and 'Test Number' in str(cell):
            # Try to extract the number part
            cell_str = str(cell).strip()
            # Examples: "Test Number 1", "Test Number: 2", "Test Number 3.1"
            import re
            match = re.search(r'Test Number[:\s]*([0-9.]+)', cell_str, re.IGNORECASE)
            if match:
                return f"Test Number {match.group(1)}"
            else:
                return cell_str
    return "Unknown Test"

def convert_test_chunk_to_text(chunk_df, sheet_name, filename, test_number, start_row, end_row):
    """Convert a test chunk to structured text"""
    content_parts = [
        f"File: {filename}",
        f"Sheet: {sheet_name}",
        f"Test: {test_number}",
        f"Rows: {start_row}-{end_row}",
        f"Columns: {', '.join(chunk_df.columns)}",
        "=" * 60
    ]
    
    for idx, (original_idx, row) in enumerate(chunk_df.iterrows(), start=start_row):
        row_parts = []
        for col, value in row.items():
            if pd.notna(value) and str(value).strip():
                row_parts.append(f"{col}: {value}")
        
        if row_parts:
            content_parts.append(f"Row {original_idx + 1}: {' | '.join(row_parts)}")
    
    return "\n".join(content_parts)

def convert_entire_sheet_to_text(df, sheet_name, filename):
    """Convert entire sheet when no Test Number found"""
    content_parts = [
        f"File: {filename}",
        f"Sheet: {sheet_name}",
        f"Total Rows: {len(df)}",
        f"Columns: {', '.join(df.columns)}",
        "=" * 60
    ]
    
    for idx, row in df.iterrows():
        row_parts = []
        for col, value in row.items():
            if pd.notna(value) and str(value).strip():
                row_parts.append(f"{col}: {value}")
        
        if row_parts:
            content_parts.append(f"Row {idx + 1}: {' | '.join(row_parts)}")
    
    return "\n".join(content_parts)

In [334]:
if __name__ == "__main__":
    docs = data_ingestion()
    print(f"Loaded and split {len(docs)} document chunks")
    
    if docs:
        print("\nFirst chunk preview:")
        print(docs[0].page_content[:500] + "...")
        print(f"Metadata: {docs[2].metadata}") # example chunk

Loaded and split 10 document chunks

First chunk preview:
File: PCA9546ABS_TP Comparison(FUNC).csv
Sheet: CSV
Test: Header/Preamble
Rows: 1-30
Columns: #include "Testplan.h", Unnamed: 1, Unnamed: 2, Unnamed: 3, Unnamed: 4, Unnamed: 5, Unnamed: 6, Unnamed: 7, Unnamed: 8, Unnamed: 9, Unnamed: 10, Unnamed: 11, Unnamed: 12
Row 2: #include "Testplan.h": //-----------------------------------------------------------------------------
Row 3: #include "Testplan.h": //
Row 4: #include "Testplan.h": // ...
Metadata: {'source': 'PCA9546ABS_TP Comparison(FUNC).csv', 'sheet': 'CSV', 'test_number': 'Unknown Test', 'chunk_type': 'test_based', 'start_row': 62, 'end_row': 88, 'total_rows': 27}


In [337]:
docs[3]



## Create Vector Store

In [None]:
def get_vector_store(docs):
    # convert to embeddings
    vectorstore_faiss=FAISS.from_documents(
        docs,
        bedrock_embeddings # titan model
    )
    vectorstore_faiss.save_local("faiss_index")

In [325]:
docs = data_ingestion()
get_vector_store(docs)

In [326]:
# loading vdb from local directory
faiss_index = FAISS.load_local("faiss_index", bedrock_embeddings, allow_dangerous_deserialization=True)

## Load LLM

In [223]:
def get_openai_llm():
    llm = Bedrock(model_id="openai.gpt-oss-120b-1:0", client=bedrock)
    return llm

def get_anthropic_llm():
    llm=Bedrock(model_id="anthropic.claude-sonnet-4-20250514-v1:0", client=bedrock)
    return llm

In [224]:
gpt_oss = get_openai_llm()
claude_sonnet = get_anthropic_llm()

## Augment Prompt

In [301]:
prompt_template = """

Human: You are an expert in semiconductor test automation and script translation. Your task is to convert legacy MCT 2000 test scripts into the modern SPEAL format. You must maintain the logic, structure, and intent of the original script while rewriting it using the SPEAL syntax and conventions.
You will receive an MCT 2000 code snippet as input, possibly accompanied by retrieved examples showing similar MCT-to-SPEAL conversions. Use these examples only as reference — do not copy them directly unless clearly appropriate.

<context>
{context}
</context>

Question: {question}

Assistant:"""

PROMPT = PromptTemplate(
    template=prompt_template, input_variables=["context", "question"]
)

In [318]:
from langchain.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate

system_template = (
    "You are an expert in semiconductor test automation and script translation. "
    "Your task is to convert legacy MCT 2000 test scripts into the modern SPEAL format. "
    "You must maintain the logic, structure, and intent of the original script while rewriting it using SPEAL syntax."
    "\nUse the following context as reference if helpful. Code on the LEFT represents SPEAL formatting, while code on the RIGHT represents MCT 2000 formatting.\n"
)

human_template = """
<context>
{context}
</context>

Question: {question}
"""

PROMPT = ChatPromptTemplate.from_messages([
    SystemMessagePromptTemplate.from_template(system_template),
    HumanMessagePromptTemplate.from_template(human_template),
])

In [319]:
# testing prompt
print(PROMPT.format(context="some context", question="Translate this code..."))

System: You are an expert in semiconductor test automation and script translation. Your task is to convert legacy MCT 2000 test scripts into the modern SPEAL format. You must maintain the logic, structure, and intent of the original script while rewriting it using SPEAL syntax.
Use the following context as reference if helpful. Code on the LEFT represents SPEAL formatting, while code on the RIGHT represents MCT 2000 formatting.

Human: 
<context>
some context
</context>

Question: Translate this code...



## RAG (retrieval and LLM input)

In [233]:
def get_response_llm(llm, vectorstore_faiss,query):
    qa = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vectorstore_faiss.as_retriever(
        search_type="similarity", search_kwargs={"k": 3}
    ),
    return_source_documents=True,
    chain_type_kwargs={"prompt": PROMPT}
)
    answer=qa({"query":query})
    return answer['result']


In [261]:
user_input = 'What is this document about?'
get_response_llm(gpt_oss, faiss_index, user_input)

TypeError: get_response_llm() takes 2 positional arguments but 3 were given

In [264]:
from langchain_core.messages import HumanMessage, SystemMessage

system_prompt = "You are a helpful assistant that explains quantum concepts to 12-year-olds."
user_prompt = "Explain what a qubit is."

llm = get_openai_llm()

llm.invoke([
    SystemMessage(content=system_prompt),
    HumanMessage(content=user_prompt)
])


ValueError: Error raised by bedrock service: An error occurred (ValidationException) when calling the InvokeModel operation: Failed to deserialize the JSON body into the target type: inputText: unknown field `inputText`, expected one of `messages`, `model`, `store`, `reasoning_effort`, `metadata`, `frequency_penalty`, `logit_bias`, `logprobs`, `top_logprobs`, `max_tokens`, `max_completion_tokens`, `n`, `modalities`, `prediction`, `audio`, `presence_penalty`, `response_format`, `seed`, `service_tier`, `stop`, `stream`, `stream_options`, `temperature`, `top_p`, `tools`, `tool_choice`, `parallel_tool_calls`, `user`, `function_call`, `functions` at line 1 column 12

## RAG Workaround

In [314]:
from langchain_core.language_models import BaseLLM
from langchain_core.outputs import LLMResult, Generation
from typing import Any, List, Optional, Dict
import boto3
import json
from pydantic import Field

# custom wrapper
class GPTOSSLLM(BaseLLM):
    bedrock: Any = Field(exclude=True) 
    model_id: str = "openai.gpt-oss-120b-1:0"
    #model_id: str = "anthropic.claude-sonnet-4-20250514-v1:0"
    #model_id: str = "anthropic.claude-3-7-sonnet-20250219-v1:0"
    
    def _generate(
        self,
        prompts: List[str],
        stop: Optional[List[str]] = None,
        **kwargs: Any,
    ) -> LLMResult:
        generations = []
        for prompt in prompts:
            try:
                response = self.bedrock.invoke_model(
                    modelId=self.model_id,
                    contentType="application/json",
                    accept="application/json",
                    body=json.dumps({
                        "messages": [{"role": "user", "content": prompt}],
                        "temperature": kwargs.get("temperature", 0.3),
                        "top_p": kwargs.get("top_p", 0.9),
                        "max_completion_tokens": kwargs.get("max_completion_tokens", 1024)
                    })
                )
                response_body = json.loads(response['body'].read())
                text = response_body['choices'][0]['message']['content']
                generations.append([Generation(text=text)])
            except Exception as e:
                generations.append([Generation(text=f"Error: {str(e)}")])
        
        return LLMResult(generations=generations)
    
    @property
    def _llm_type(self) -> str:
        return "gpt-oss"

In [368]:
class ClaudeLLM(BaseLLM): # sonnet 4 
    bedrock: Any = Field(exclude=True) 
    #model_id: str = "anthropic.claude-sonnet-4-20250514-v1:0"
    model_id: str = "arn:aws:bedrock:us-west-2:897189464960:inference-profile/us.anthropic.claude-sonnet-4-20250514-v1:0"
    
    def _generate(
        self,
        prompts: List[str],
        stop: Optional[List[str]] = None,
        **kwargs: Any,
    ) -> LLMResult:
        generations = []
        for prompt in prompts:
            try:
                response = self.bedrock.invoke_model(
                    modelId=self.model_id,
                    contentType="application/json",
                    accept="application/json",
                    body=json.dumps({
                        "anthropic_version": "bedrock-2023-05-31",
                        "messages": [{"role": "user", "content": prompt}],
                        "temperature": kwargs.get("temperature", 0.3),
                        "top_p": kwargs.get("top_p", 0.9),
                        "max_tokens": kwargs.get("max_tokens", 1024)
                    })
                )

                response_body = json.loads(response['body'].read())
                text = response_body['content'][0]['text']
                generations.append([Generation(text=text)])
            except Exception as e:
                generations.append([Generation(text=f"Error: {str(e)}")])
        
        return LLMResult(generations=generations)
    
    @property
    def _llm_type(self) -> str:
        return "claude"

In [360]:
def get_response_llm(vectorstore_faiss, query):
    # Initialize Bedrock client
    session = boto3.Session(profile_name="Caleb")
    bedrock = session.client("bedrock-runtime", region_name="us-west-2")
    
    # Initialize custom LLM
    llm = GPTOSSLLM(bedrock=bedrock) 
    llm = ClaudeLLM(bedrock=bedrock) 
    
    # Set up RetrievalQA
    qa = RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="stuff",
        retriever=vectorstore_faiss.as_retriever(search_kwargs={"k": 3}),
        return_source_documents=True,
        chain_type_kwargs={"prompt": PROMPT}
    )

    return qa.invoke({"query": query})["result"]

# def get_response_llm(vectorstore_faiss, query, model_id): # with selection
#     session = boto3.Session(profile_name="Caleb")
#     bedrock = session.client("bedrock-runtime", region_name="us-west-2")
    
#     if model_id.startswith("openai."):
#         llm = GPTOSSLLM(bedrock=bedrock, model_id=model_id)
#     elif model_id.startswith("anthropic."):
#         llm = ClaudeLLM(bedrock=bedrock, model_id=model_id)
#     else:
#         raise ValueError(f"Unsupported model: {model_id}")
    
#     qa = RetrievalQA.from_chain_type(
#         llm=llm,
#         chain_type="stuff",
#         retriever=vectorstore_faiss.as_retriever(search_kwargs={"k": 3}),
#         return_source_documents=True,
#         chain_type_kwargs={"prompt": PROMPT}
#     )

#     return qa.invoke({"query": query})["result"]


In [350]:
session = boto3.Session()
bedrock = session.client("bedrock-runtime", region_name="us-west-2")
identity = sts.get_caller_identity()
print(identity)

{'UserId': 'AROA5BZFIZOADUUEBUTW6:caleb.chan@nxp.com', 'Account': '897189464960', 'Arn': 'arn:aws:sts::897189464960:assumed-role/AWSReservedSSO_ccoe-powerusers_6dbe169efbadd99d/caleb.chan@nxp.com', 'ResponseMetadata': {'RequestId': '6625027f-9844-48ca-9e03-ce3d535329ab', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '6625027f-9844-48ca-9e03-ce3d535329ab', 'x-amz-sts-extended-request-id': 'MTp1cy1lYXN0LTE6MTc1NDU2NjM5OTI2OTpHOjlFcTlDMWJW', 'content-type': 'text/xml', 'content-length': '490', 'date': 'Thu, 07 Aug 2025 11:33:19 GMT'}, 'RetryAttempts': 0}}


In [371]:
user_input = """""
       hello. what exact model vesion are you?
"""
#get_response_llm(faiss_index, user_input)
response_text = get_response_llm(faiss_index, user_input)
print(response_text)

I am Claude, an AI assistant created by Anthropic. I don't have access to my exact model version information, but I'm designed to help with various tasks including the semiconductor test automation and script translation work you've set up for me.

I can see from your system prompt that you'd like me to convert legacy MCT 2000 test scripts into modern SPEAL format while maintaining the original logic, structure, and intent. The context you've provided shows examples of semiconductor test scripts with digital pattern testing, pin configurations, and various test parameters.

Is there a specific MCT 2000 script you'd like me to help translate to SPEAL format? I'm ready to assist with that conversion work.


In [267]:
user_input = 'What is this document about?'
get_response_llm(faiss_index, user_input)

'Error: An error occurred (ValidationException) when calling the InvokeModel operation: Invocation of model ID anthropic.claude-sonnet-4-20250514-v1:0 with on-demand throughput isn’t supported. Retry your request with the ID or ARN of an inference profile that contains this model.'

# AWS Knowledge Base

In [None]:
import boto3
import json
from langchain_community.retrievers import AmazonKnowledgeBasesRetriever
from langchain_community.llms import Bedrock
from langchain.chains import RetrievalQA

# === CONFIGURATION ===
bedrock = boto3.client("bedrock-agent", region_name="us-west-1")  # Adjust region
kb_name = "mct-to-speal-kb"
s3_uri = "s3://your-bucket/mct-docs/"  # already uploaded chunked .txt files
bedrock_role_arn = "arn:aws:iam::<your-account>:role/BedrockKnowledgeBaseRole"

# === STEP 1: Create or describe knowledge base ===
response = bedrock.create_knowledge_base(
    name=kb_name,
    description="KB for MCT to SPEAL code conversion",
    roleArn=bedrock_role_arn,
    knowledgeBaseConfiguration={
        "type": "VECTOR",
        "vectorKnowledgeBaseConfiguration": {
            "embeddingModelArn": "arn:aws:bedrock:us-east-1::foundation-model/amazon.titan-embed-text-v1"
        }
    },
    storageConfiguration={
        "type": "S3",
        "s3": {
            "bucketArn": "arn:aws:s3:::your-bucket",
            "inclusionPrefixes": ["mct-docs/"]
        }
    }
)

knowledge_base_id = response["knowledgeBase"]["knowledgeBaseId"]
print("Created KB:", knowledge_base_id)

# === STEP 2: Use LangChain to connect to KB ===
retriever = AmazonKnowledgeBasesRetriever(
    knowledge_base_id=knowledge_base_id,
    region_name="us-east-1"
)

# === STEP 3: Set up Claude via Bedrock ===
llm = Bedrock(
    model_id="anthropic.claude-3-sonnet-20240229-v1:0",
    region_name="us-east-1",
    model_kwargs={"temperature": 0.1}
)

# === STEP 4: Retrieval QA chain ===
rag_chain = RetrievalQA.from_chain_type(
    llm=llm,
    retriever=retriever,
    return_source_documents=True
)

# === STEP 5: Ask a question ===
query = "How do I convert an MCT loop to a SPEAL FOR structure?"
response = rag_chain({"query": query})

print("Answer:", response["result"])
print("Sources:", [doc.metadata["source"] for doc in response["source_documents"]])
