In [31]:
import boto3
import json
import time
import pandas as pd
from opensearchpy import OpenSearch
from langchain_aws import BedrockEmbeddings, ChatBedrock

In [32]:
def create_aws_session():
    try:
        return boto3.Session(
            region_name="us-east-1",
            aws_access_key_id="",
            aws_secret_access_key=""
        )
    except Exception as e:
        print(f"Error creating AWS session: {str(e)}")
        return None

def create_s3_client(session):
    try:
        if session:
            return session.client('s3')
        return None
    except Exception as e:
        print(f"Error creating S3 client: {str(e)}")
        return None

def create_personalize_client(session):
    try:
        if session:
            return session.client('personalize')
        return None
    except Exception as e:
        print(f"Error creating S3 client: {str(e)}")
        return None

In [33]:
# Initialize Bedrock client
bedrock_client = boto3.client(
    service_name="bedrock-runtime",
    region_name="us-east-1",
    aws_access_key_id="",
    aws_secret_access_key=""
)

# Initialize models
embed_model = BedrockEmbeddings(
    client=bedrock_client,
    model_id="amazon.titan-embed-text-v1"
)

In [34]:
session = create_aws_session()
s3_client = create_s3_client(session)
per_client = create_personalize_client(session)

In [35]:
# AWS setup
region = "us-east-1"
s3_bucket = "bucket-for-my-kb-pdf-test"  # replace if needed
personalize = per_client
personalize_runtime = boto3.client("personalize-runtime", region_name=region, aws_access_key_id="",
    aws_secret_access_key="")
s3 = s3_client
# Upload helpers
def upload_to_s3(filename, bucket, key):
    s3.upload_file(Filename=filename, Bucket=bucket, Key=key)
    print(f"Uploaded {filename} to s3://{bucket}/{key}")

In [36]:
# Upload to S3
upload_to_s3("interactions_1000.csv", s3_bucket, "interactions.csv")
upload_to_s3("items_100.csv", s3_bucket, "items.csv")

Uploaded interactions_1000.csv to s3://bucket-for-my-kb-pdf-test/interactions.csv
Uploaded items_100.csv to s3://bucket-for-my-kb-pdf-test/items.csv


In [37]:
# Create dataset group
response = personalize.create_dataset_group(name="hybrid-search-dataset-group_new1")
dataset_group_arn = response["datasetGroupArn"]

# Wait for dataset group to be ACTIVE
def wait_for_active(resource_arn, describe_fn, name):
    while True:
        # Modified this line to use keyword argument
        status = describe_fn(datasetGroupArn=resource_arn)["datasetGroup"]["status"]
        print(f"{name} status: {status}")
        if status == "ACTIVE":
            break
        time.sleep(10)

# Call the wait function
wait_for_active(dataset_group_arn, personalize.describe_dataset_group, "Dataset Group")

Dataset Group status: CREATE PENDING
Dataset Group status: CREATE PENDING
Dataset Group status: CREATE PENDING
Dataset Group status: ACTIVE


In [38]:
dataset_group_arn

'arn:aws:personalize:us-east-1:233736836855:dataset-group/hybrid-search-dataset-group_new1'

In [40]:
interaction_schema = {
    "type": "record",
    "name": "Interactions",
    "namespace": "com.amazonaws.personalize.schema",
    "fields": [
        {"name": "user_id", "type": "string"},
        {"name": "item_id", "type": "string"},
        {"name": "event_type", "type": "string"},
        {"name": "timestamp", "type": "long"}
    ],
    "version": "1.0"
}
schema_arn = personalize.create_schema(
    name="interactions-schema1",
    schema=json.dumps(interaction_schema)
)["schemaArn"]
interactions_dataset_arn = personalize.create_dataset(
    name="interactions-dataset1",
    datasetType="INTERACTIONS",
    datasetGroupArn=dataset_group_arn,
    schemaArn=schema_arn
)["datasetArn"]

In [42]:
iam_role = "arn:aws:iam::233736836855:role/PersonalizeS3Access"  # your IAM role ARN

# Create dataset import job
import_job_arn = personalize.create_dataset_import_job(
    jobName="interactions-import-job_new1",
    datasetArn=interactions_dataset_arn,
    dataSource={"dataLocation": "s3://bucket-for-my-kb-pdf-test/interactions.csv"},
    roleArn=iam_role
)["datasetImportJobArn"]

# Wait for import job to complete
def wait_for_import_job(import_job_arn, max_time=3600):  # 1 hour timeout
    start_time = time.time()
    
    while True:
        response = personalize.describe_dataset_import_job(datasetImportJobArn=import_job_arn)
        status = response["datasetImportJob"]["status"]
        print(f"Import Job status: {status}")
        
        if status == "ACTIVE":
            print("Import job completed successfully")
            break
        elif status == "CREATE FAILED":
            raise Exception("Import job failed")
            
        if time.time() - start_time > max_time:
            raise TimeoutError("Import job timed out")
            
        time.sleep(30)  # Wait for 30 seconds before checking again

try:
    wait_for_import_job(import_job_arn)
except Exception as e:
    print(f"Error during import job: {str(e)}")

Import Job status: CREATE PENDING
Import Job status: CREATE IN_PROGRESS
Import Job status: CREATE IN_PROGRESS
Import Job status: CREATE IN_PROGRESS
Import Job status: CREATE IN_PROGRESS
Import Job status: CREATE IN_PROGRESS
Import Job status: ACTIVE
Import job completed successfully


In [45]:
# Train the model
solution_arn = personalize.create_solution(
    name="ranking-solution-new1",
    datasetGroupArn=dataset_group_arn,
    recipeArn="arn:aws:personalize:::recipe/aws-personalized-ranking"
)["solutionArn"]

# Create solution version
solution_version_arn = personalize.create_solution_version(
    solutionArn=solution_arn
)["solutionVersionArn"]

# Wait for solution version to be active
def wait_for_solution_version(solution_version_arn, max_time=3600):  # 1 hour timeout
    start_time = time.time()
    
    while True:
        response = personalize.describe_solution_version(
            solutionVersionArn=solution_version_arn
        )
        status = response["solutionVersion"]["status"]
        print(f"Solution Version status: {status}")
        
        if status == "ACTIVE":
            print("Solution version creation completed successfully")
            break
        elif status == "CREATE FAILED":
            raise Exception(f"Solution version creation failed: {response['solutionVersion'].get('failureReason', 'Unknown reason')}")
            
        if time.time() - start_time > max_time:
            raise TimeoutError("Solution version creation timed out")
            
        time.sleep(60)  # Check every minute

try:
    print("Creating solution version - this may take a while...")
    wait_for_solution_version(solution_version_arn)
    
    # Get metrics after training is complete
    metrics_response = personalize.get_solution_metrics(
        solutionVersionArn=solution_version_arn
    )
    print("\nTraining Metrics:")
    print(metrics_response["metrics"])
    
except Exception as e:
    print(f"Error during solution version creation: {str(e)}")

Creating solution version - this may take a while...
Solution Version status: CREATE PENDING
Solution Version status: CREATE IN_PROGRESS
Solution Version status: CREATE IN_PROGRESS
Solution Version status: CREATE IN_PROGRESS
Solution Version status: CREATE IN_PROGRESS
Solution Version status: CREATE IN_PROGRESS
Solution Version status: CREATE IN_PROGRESS
Solution Version status: CREATE IN_PROGRESS
Solution Version status: CREATE IN_PROGRESS
Solution Version status: CREATE IN_PROGRESS
Solution Version status: CREATE IN_PROGRESS
Solution Version status: CREATE IN_PROGRESS
Solution Version status: CREATE IN_PROGRESS
Solution Version status: CREATE IN_PROGRESS
Solution Version status: CREATE IN_PROGRESS
Solution Version status: ACTIVE
Solution version creation completed successfully

Training Metrics:
{'coverage': 0.3069, 'mean_reciprocal_rank_at_25': 0.0241, 'normalized_discounted_cumulative_gain_at_10': 0.0, 'normalized_discounted_cumulative_gain_at_25': 0.1029, 'normalized_discounted_cu

In [47]:
# ============================================
# :rocket: 7. CREATE CAMPAIGN
# ============================================
# Create campaign
campaign_arn = personalize.create_campaign(
    name="ranking-campaign-new1",
    solutionVersionArn=solution_version_arn,
    minProvisionedTPS=1
)["campaignArn"]

# Wait for campaign to be active
def wait_for_campaign(campaign_arn, max_time=3600):  # 1 hour timeout
    start_time = time.time()
    
    while True:
        response = personalize.describe_campaign(
            campaignArn=campaign_arn
        )
        status = response["campaign"]["status"]
        print(f"Campaign status: {status}")
        
        if status == "ACTIVE":
            print("Campaign creation completed successfully")
            break
        elif status == "CREATE FAILED":
            raise Exception(f"Campaign creation failed: {response['campaign'].get('failureReason', 'Unknown reason')}")
            
        if time.time() - start_time > max_time:
            raise TimeoutError("Campaign creation timed out")
            
        time.sleep(30)  # Check every 30 seconds

try:
    print("Creating campaign...")
    wait_for_campaign(campaign_arn)
    
    # Get campaign details after creation is complete
    campaign_response = personalize.describe_campaign(
        campaignArn=campaign_arn
    )
    print("\nCampaign Details:")
    print(f"Name: {campaign_response['campaign']['name']}")
    print(f"ARN: {campaign_response['campaign']['campaignArn']}")
    print(f"Status: {campaign_response['campaign']['status']}")
    
except Exception as e:
    print(f"Error during campaign creation: {str(e)}")

Creating campaign...
Campaign status: CREATE PENDING
Campaign status: CREATE IN_PROGRESS
Campaign status: CREATE IN_PROGRESS
Campaign status: CREATE IN_PROGRESS
Campaign status: CREATE IN_PROGRESS
Campaign status: CREATE IN_PROGRESS
Campaign status: CREATE IN_PROGRESS
Campaign status: CREATE IN_PROGRESS
Campaign status: CREATE IN_PROGRESS
Campaign status: CREATE IN_PROGRESS
Campaign status: CREATE IN_PROGRESS
Campaign status: CREATE IN_PROGRESS
Campaign status: CREATE IN_PROGRESS
Campaign status: CREATE IN_PROGRESS
Campaign status: CREATE IN_PROGRESS
Campaign status: CREATE IN_PROGRESS
Campaign status: CREATE IN_PROGRESS
Campaign status: CREATE IN_PROGRESS
Campaign status: CREATE IN_PROGRESS
Campaign status: CREATE IN_PROGRESS
Campaign status: ACTIVE
Campaign creation completed successfully

Campaign Details:
Name: ranking-campaign-new1
ARN: arn:aws:personalize:us-east-1:233736836855:campaign/ranking-campaign-new1
Status: ACTIVE


In [None]:
# Sample hybrid results from OpenSearch (simulated)
hybrid_items = ["item_2", "item_4", "item_1", "item_5"]
user_id = "user_1"
# Re-rank with personalize
response = personalize_runtime.get_personalized_ranking(
    campaignArn=campaign_arn,
    userId=user_id,
    inputList=hybrid_items
)
ranked_items = [item["itemId"] for item in response["personalizedRanking"]]
print(":repeat: Re-ranked personalized results:", ranked_items)