# Dr. Claude- Patient Care with AWS
## Personalized Medical Recommendations using Bedrock, DynamoDb, and SNS

In [None]:
%pip install --upgrade pip --quiet
%pip install boto3 --force-reinstall --quiet
%pip install botocore --force-reinstall --quiet
%pip install langchain --force-reinstall --quiet
%pip install -U opensearch-py==2.3.1

In [None]:
import boto3
import botocore
from botocore.exceptions import ClientError
import json
import csv
from io import StringIO
from langchain_community.chat_models import BedrockChat
from langchain_core.prompts import ChatPromptTemplate
from langchain.retrievers.bedrock import AmazonKnowledgeBasesRetriever
import random
import pprint
import time
from policy_creator import create_bedrock_execution_role, create_oss_policy_attach_bedrock_execution_role, create_policies_in_oss
suffix = random.randrange(200, 900)

boto3_session = boto3.session.Session()
region_name = boto3_session.region_name
sts_client = boto3.client('sts')
bedrock_agent_client = boto3_session.client('bedrock-agent', region_name=region_name)
service = 'aoss'
account_id = sts_client.get_caller_identity()["Account"]
s3_suffix = f"{region_name}-{account_id}"

In [None]:
# Create an S3 client
s3 = boto3.client('s3')

# Create an S3 bucket
bucket_name = f'your-s3-bucket{s3_suffix}'
try:
    s3.create_bucket(Bucket=bucket_name)
except botocore.exceptions.ClientError as e:
    error_code = e.response['Error']['Code']
    if error_code == 'BucketAlreadyOwnedByYou':
        print(f"Bucket {bucket_name} already exists.")
    else:
        print(f"Error creating bucket: {e}")
        raise

In [None]:
vector_store_name = f'bedrock-sample-rag-{suffix}'
index_name = f"bedrock-sample-rag-index-{suffix}"
aoss_client = boto3_session.client('opensearchserverless')
bedrock_kb_execution_role = create_bedrock_execution_role(bucket_name=bucket_name)
bedrock_kb_execution_role_arn = bedrock_kb_execution_role['Role']['Arn']

In [None]:
# create security, network and data access policies within OSS
encryption_policy, network_policy, access_policy = create_policies_in_oss(vector_store_name=vector_store_name,
                       aoss_client=aoss_client,
                       bedrock_kb_execution_role_arn=bedrock_kb_execution_role_arn)
collection = aoss_client.create_collection(name=vector_store_name,type='VECTORSEARCH')

In [None]:
collection_id = collection['createCollectionDetail']['id']
host = collection_id + '.' + region_name + '.aoss.amazonaws.com'

In [None]:
response = aoss_client.batch_get_collection(names=[vector_store_name])
# Check if the collection exists
if 'collectionDetails' in response:
    collection_details = response['collectionDetails']
    if collection_details:
        collection_status = collection_details[0].get('status', None)
        if collection_status:
            # Check if the collection status is 'ACTIVE'
            if collection_status == 'ACTIVE':
                print('Collection already exists and is ACTIVE.')
            elif collection_status == 'CREATING':
                # Collection is still being created, wait for it to become ACTIVE
                print('Collection is still being created...')
                while collection_status == 'CREATING':
                    time.sleep(30)  # Wait for 30 seconds
                    response = aoss_client.batch_get_collection(names=[vector_store_name])
                    collection_details = response.get('collectionDetails', [])
                    if collection_details:
                        collection_status = collection_details[0].get('status', None)
                print('Collection creation completed.')
            else:
                print(f'Collection status: {collection_status}')
        else:
            print('Collection status not found in response.')
    else:
        print('Collection details not found in response.')
else:
    print('No collection details found in response.')

In [None]:
# create oss policy and attach it to Bedrock execution role
create_oss_policy_attach_bedrock_execution_role(collection_id=collection_id,
                                                bedrock_kb_execution_role=bedrock_kb_execution_role)

In [None]:
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
credentials = boto3.Session().get_credentials()
awsauth = auth = AWSV4SignerAuth(credentials, region_name, service)

index_name = f"bedrock-sample-index-{suffix}"
body_json = {
   "settings": {
      "index.knn": "true",
       "number_of_shards": 1,
       "knn.algo_param.ef_search": 512,
       "number_of_replicas": 0,
   },
   "mappings": {
      "properties": {
         "vector": {
            "type": "knn_vector",
            "dimension": 1536,
             "method": {
                 "name": "hnsw",
                 "engine": "nmslib",
                 "space_type": "cosinesimil",
                 "parameters": {
                     "ef_construction": 512,
                     "m": 16
                 },
             },
         },
         "text": {
            "type": "text"
         },
         "text-metadata": {
            "type": "text"         }
      }
   }
}
# Build the OpenSearch client
oss_client = OpenSearch(
    hosts=[{'host': host, 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
    timeout=300
)
# # It can take up to a minute for data access rules to be enforced
time.sleep(60)

In [None]:
# Create index
response = oss_client.indices.create(index=index_name, body=json.dumps(body_json))
print('\nCreating index:')
print(response)
time.sleep(60) # index creation can take up to a minute

In [None]:
folder_path = 'knowledge_data/'  # Specify the folder path (with trailing '/')
s3_key = folder_path + 'hypertension.pdf'  # Combine folder path with file name

# Upload a CSV file to the S3 bucket
local_file_path = 'hypertension.pdf'

try:
    s3.upload_file(Filename=local_file_path, Bucket=bucket_name, Key=s3_key)
    print(f"File '{local_file_path}' uploaded to '{bucket_name}/{s3_key}'")
except Exception as e:
    print(f"Error uploading file to S3: {e}")
    raise

In [None]:
opensearchServerlessConfiguration = {
            "collectionArn": collection["createCollectionDetail"]['arn'],
            "vectorIndexName": index_name,
            "fieldMapping": {
                "vectorField": "vector",
                "textField": "text",
                "metadataField": "text-metadata"
            }
        }

chunkingStrategyConfiguration = {
    "chunkingStrategy": "FIXED_SIZE",
    "fixedSizeChunkingConfiguration": {
        "maxTokens": 512,
        "overlapPercentage": 20
    }
}

s3Configuration = {
    "bucketArn": f"arn:aws:s3:::{bucket_name}",
    # "inclusionPrefixes":["*.*"] # you can use this if you want to create a KB using data within s3 prefixes.
}

embeddingModelArn = f"arn:aws:bedrock:{region_name}::foundation-model/amazon.titan-embed-text-v1"

name = f"bedrock-sample-knowledge-base-{suffix}"
description = "Guideline for the pharmacological treatment of hypertension in adults"
roleArn = bedrock_kb_execution_role_arn

In [None]:
# Create a KnowledgeBase
from retrying import retry

@retry(wait_random_min=1000, wait_random_max=2000,stop_max_attempt_number=7)
def create_knowledge_base_func():
    create_kb_response = bedrock_agent_client.create_knowledge_base(
        name = name,
        description = description,
        roleArn = roleArn,
        knowledgeBaseConfiguration = {
            "type": "VECTOR",
            "vectorKnowledgeBaseConfiguration": {
                "embeddingModelArn": embeddingModelArn
            }
        },
        storageConfiguration = {
            "type": "OPENSEARCH_SERVERLESS",
            "opensearchServerlessConfiguration":opensearchServerlessConfiguration
        }
    )
    return create_kb_response["knowledgeBase"]

In [None]:
try:
    kb = create_knowledge_base_func()
except Exception as err:
    print(f"{err=}, {type(err)=}")

In [None]:
get_kb_response = bedrock_agent_client.get_knowledge_base(knowledgeBaseId = kb['knowledgeBaseId'])

In [None]:
# Create a DataSource in KnowledgeBase 
create_ds_response = bedrock_agent_client.create_data_source(
    name = name,
    description = description,
    knowledgeBaseId = kb['knowledgeBaseId'],
    dataSourceConfiguration = {
        "type": "S3",
        "s3Configuration":s3Configuration
    },
    vectorIngestionConfiguration = {
        "chunkingConfiguration": chunkingStrategyConfiguration
    }
)
ds = create_ds_response["dataSource"]

In [None]:
bedrock_agent_client.get_data_source(knowledgeBaseId = kb['knowledgeBaseId'], dataSourceId = ds["dataSourceId"])

In [None]:
start_job_response = bedrock_agent_client.start_ingestion_job(knowledgeBaseId = kb['knowledgeBaseId'], dataSourceId = ds["dataSourceId"])

In [None]:
job = start_job_response["ingestionJob"]

In [None]:
# Get job 
while(job['status']!='COMPLETE' ):
  get_job_response = bedrock_agent_client.get_ingestion_job(
      knowledgeBaseId = kb['knowledgeBaseId'],
        dataSourceId = ds["dataSourceId"],
        ingestionJobId = job["ingestionJobId"]
  )
  job = get_job_response["ingestionJob"]
time.sleep(40)

In [None]:
kb_id =  'LWYAWODIVN' #kb["knowledgeBaseId"]

In [None]:
%store kb_id

In [None]:
# Create an S3 client
s3 = boto3.client('s3')

# Create an S3 bucket
patient_bucket_name = f'patient-data-{s3_suffix}'

try:
    s3.create_bucket(Bucket=patient_bucket_name)
except botocore.exceptions.ClientError as e:
    error_code = e.response['Error']['Code']
    if error_code == 'BucketAlreadyOwnedByYou':
        print(f"Bucket {bucket_name} already exists.")
    else:
        print(f"Error creating bucket: {e}")
        raise

In [None]:
# Upload a CSV file with ptient data to the S3 bucket
local_file_path = 'patient_data.csv'
s3_key_dataset = local_file_path

try:
    s3.upload_file(Filename=local_file_path, Bucket=bucket_name, Key=s3_key)
except ClientError as e:
    print(f"Error uploading file to S3: {e}")
    raise

In [None]:
# Create a DynamoDB client
dynamodb = boto3.client('dynamodb')

table_name = "my_dynamo_table" # change to your table name  

# Create the DynamoDB table using the import_table client
try:
    response = dynamodb.import_table(
        S3BucketSource={
            'S3Bucket': bucket_name,
            'S3KeyPrefix': s3_key_dataset
        },
        InputFormat='CSV',
        TableCreationParameters={
            'TableName': table_name, 
            'AttributeDefinitions': [
                {
                    'AttributeName': 'Patient ID',
                    'AttributeType': 'S'
                },
            ],
            'KeySchema': [
                {
                    'AttributeName': 'Patient ID',
                    'KeyType': 'HASH'
                },
            ],
            'BillingMode': 'PAY_PER_REQUEST'
        }
    )

    # Wait for the table to be created
    table_status = None
    while table_status != 'ACTIVE':
        describe_response = dynamodb.describe_table(TableName='my-table')
        table_status = describe_response['Table']['TableStatus']
        if table_status != 'ACTIVE':
            time.sleep(5)  # Wait for 5 seconds before checking again

    print("Table created successfully.")

except botocore.exceptions.ClientError as e:
    error_code = e.response['Error']['Code']
    if error_code == 'ResourceInUseException':
        print("Table already exists.")
    else:
        print("Error creating table:", e)


In [None]:
# Aqui tenho de passar patient_id
patient_id = '1'

In [None]:
try:
    # Retrieve item from DynamoDB
    response = dynamodb.get_item(
        TableName=table_name,
        Key={
            'Patient ID': {'S': patient_id}
        }
    )
    
    # Extract patient information if item exists
    item = response.get('Item')
    if item:
        patient_info = {key: value.get('S') for key, value in item.items()}
        print("Patient Information:", patient_info)
    else:
        print("Patient not found")
        patient_info = None

except Exception as e:
    print("Error retrieving patient information:", e)
    patient_info = None


In [None]:
# Simulate getting recommendation using BedrockChat
# Create the LangChain components
from langchain.prompts import PromptTemplate
import pprint as pp

bedrock_client = boto3.client("bedrock-runtime")

model_id = "anthropic.claude-3-haiku-20240307-v1:0"
llm = BedrockChat(model_id=model_id, client=bedrock_client)

question = f"What recommendations would you give to the patient based on the {patient_info}?"
retriever = AmazonKnowledgeBasesRetriever(
    knowledge_base_id=kb_id,
    retrieval_config={
        "vectorSearchConfiguration": {
            "numberOfResults": 4,
            "overrideSearchType": "SEMANTIC",  # optional
        }
    },
)

docs = retriever.get_relevant_documents(
        query=question
    )
pp.pprint(docs)

In [None]:
question = f"What recommendations would you give to the patient based on the {patient_info}?"


PROMPT_TEMPLATE = """
Human: You are a medical assistant that writes emails to patients based on the medical info you're provided with. 
You should keep a formal and professional language. 
You should present yourself as Dr. Claude, the patient's AI medical assistant, and provide next steps for the patient.
You should use medical technical terms but should not refer specific studies in your answer.
Use the following pieces of information to provide a concise answer to the question enclosed in <question> tags. 
Correlate the patient's information with the medical info from the context to provide a medically accurate response. 

<context>
{context}
</context>

<question>
{question}
</question>

Never say "based on the context or information provided". It should come across that you already know this knowledge beforehand.

Assistant:"""

claude_prompt = PromptTemplate(template=PROMPT_TEMPLATE, 
                               input_variables=["context", "question"])


In [None]:
from langchain.chains import RetrievalQA

qa = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=retriever,
    return_source_documents=True,
    chain_type_kwargs={"prompt": claude_prompt}
)

In [None]:
answer = qa.invoke(question)

recommendation = answer['result']

pp.pprint(recommendation)

In [None]:
# Create an SNS client
sns = boto3.client('sns')

# Create an SNS topic
topic_name = "drclaude2"
response = sns.create_topic(Name=topic_name)
topic_arn = response['TopicArn']
print(f"Created SNS topic: {topic_name}")

In [None]:
# Subscribe an email to the SNS topic
email_address = "duartepcmoura@gmail.com" # put your own email here 
subscription_response = sns.subscribe(
    TopicArn=topic_arn,
    Protocol='email',
    Endpoint=email_address
)

print(f"Subscribed email {email_address} to topic {topic_name}\n")
print("Check your inbox to confirm the subscription!")

In [None]:
# Check if the patient's email is subscribed to the SNS topic
response = sns.list_subscriptions_by_topic(
    TopicArn=topic_arn
)

# Extract the list of subscribed emails
subscribed_emails = [
    subscription["Endpoint"] for subscription in response["Subscriptions"]
]

patient_email = patient_info["email"]

# If the patient's email is subscribed, send the email
if patient_email in subscribed_emails:
    # Set the email subject and message
    subject = "Dr. Claude's Medical Recommendations"
    message = f"{recommendation}"

    # Send the email using SNS
    response = sns.publish(
        TopicArn= topic_arn,
        Subject=subject,
        Message=message,
        MessageAttributes={
            "email": {"DataType": "String", "StringValue": patient_email}
        },
    )
    print("Email sent successfully.")
    print("Response:", response)
else:
    print(f"The patient's email ({patient_email}) is not subscribed to the SNS topic.")
