# Form Data Extraction Lambda

In [1]:
import boto3
import json
import base64
import time
import re
from botocore.exceptions import ClientError
import logging
import redis

bedrock_runtime = boto3.client(
    service_name="bedrock-runtime",
    region_name="us-east-1",
)

bedrock = boto3.client(
    service_name='bedrock', 
    region_name='us-east-1'
)

# Initialize Redis client
redis_host = 'ch-agent-assist-redis-cluster-disabled-ro.tjyhst.ng.0001.use1.cache.amazonaws.com'
redis_port = 6379 
redis_password = None  
redis_client = redis.Redis(host=redis_host, port=redis_port, password=redis_password, decode_responses=True)

bucket = 'ch-agent-assist-prompt-library-bucket'
file_key = 'prompts_library.json' 

def decoder(data):
    decodedBytes = base64.b64decode(data)
    decodedStr = decodedBytes.decode("ascii") 
    json_str=json.loads(decodedStr)
    return json_str

##Processing data to fetch role and content
def data_preprocessing(data):
    segments = json.loads(data)
    convo = "" 
    # Extract transcripts, participant roles, and content
    for segment in segments:
        transcript = segment['transcript'][0]
        participant_role = transcript['ParticipantRole']
        content = transcript['Content']
        convo += participant_role + " : " + content + "\n"
    
    return convo

def data_postprocessing(data):
    result = ""
    start_index = data.find("{")
    
    if start_index == -1:
        data1 = "{" + data
    else:
        data1 = data
    
    start_index_final = data.find("{")    
    print(f'start_index:{start_index_final}')
    
    end_char_indices = [i.start() for i in re.finditer("}",data1)]

    #end_index = end_char_indices[len(end_char_indices)-1]
    
    if len(end_char_indices) == 0:
        data2 = data1 + '}'
    else:
        data2 = data1
      
    end_char_indices_1 = [i.start() for i in re.finditer("}",data2)]
    print(f'end_indices:{end_char_indices_1}')  
    end_index = end_char_indices[len(end_char_indices)-1]
    print(f'end_index:{end_index}')
    
    if end_index == len(data2)-1:
        result = data2[start_index_final:]
    else:
        result = data2[start_index_final:end_index+1]
    print(f'result:{result}')
    
    return result

entity_dict_v1 = {
   "patient_first_name": "",
    "patient_middle_initial": "",
    "patient_last_name": "",
    "dob": "",
    "gender": "",
    "preferred_language": "",
    "street_address": "",
    "street_name": "",
    "city": "",
    "state": "",
    "zip_code": "",
    "email": "",
    "phone": "",
    "phone_type": "",
    "prior_therapy": "",
    "diagnosis_name": "",
    "diagnosis_icd_10_code": "",
    "date_of_diagnosis": "",
    "insurance_provider": "",
    "insurance_id": "",
    "plan_type": "",
    "effective_date": "",
    "expiry_date": "",
    "record_type": "",
    "rx_bin": "",
    "rx_group": "",
    "rx_pcn": "",
    "card_holder_relationship_with_the_patient":"",
    "card_holder_name":"",
    "card_holder_dob":"",
    "prescriber_name": "",
    "specialty": "",
    "address": "",
    "facility_name": "",
    "sp_name": "",
    "sp_phone": "",
    "sp_fax": ""
  }
  
entity_list = list(entity_dict_v1.keys())
entities = ""
for val in entity_list:
    entities += val + "," 
entities = entities[:len(entities)-1]

def enrollment_prompt_generator(conversation,entities):
    prompt_claude = f"""Human: {conversation}

    The above is a transcript between a call center agent and an insurance subscriber or patient.
    From the above conversation, identify and extract the values for the following parameters {entities}.
    Make sure you adhere to the list of entities extracted and create JSON with the exact key names passed and do not change the key names.
    Include only the information present in the provided conversation and do no not make-up any information on your own.
    

    Output the results as a structured JSON containing only the extracted fields.
    
    
    Strictly Follow the rules to provide ouput in JSON format and do not provide the extra sentence 'Here are the key entities extracted from the conversation before the JSON' as part of your response.

    Assistant:
    """

    return prompt_claude
    
#Defining function to connect to Bedrock LLM
def load_claude2(bedrock_runtime , prompt , temp , top_p,top_k):
    try:
        body = {
            "prompt": prompt,
            "temperature": temp,
            "top_p": top_p,
            "top_k":top_k,
            "max_tokens_to_sample": 1000
            }

        response = bedrock_runtime.invoke_model(
            modelId="anthropic.claude-v2", body=json.dumps(body), accept="application/json", contentType="application/json"
                 )
        
        response_body = json.loads(response["body"].read())
        completion = response_body.get("completion")

        return completion

    except ClientError:
        logging.error("Couldn't invoke Llama 2")
        raise

def get_prompt(bucket,file,prompt_category,required_prompt,conversation):
    entities = "name of patient, status of insurance, insurance number, demographic details etc."
    s3 = boto3.client('s3') 
    response = s3.get_object(Bucket=bucket,Key=file)
    content = response['Body'].read().decode('utf-8')
    json_content = json.loads(content)
    prompt = json_content[prompt_category][required_prompt].format(conversation=conversation,entities=entities)
    
    return prompt


     
def SNS_Publisher(json_data):
    # Create an SNS client
    sns = boto3.client('sns')
    # Specify the topic ARN
    topic_arn = 'arn:aws:sns:us-east-1:383299343633:ch-agent-assist-processor-sns.fifo'
    # Publish JSON data to SNS topic
    response = sns.publish(TopicArn=topic_arn,Message=json.dumps({'default': json.dumps(json_data)}),MessageStructure='json',MessageGroupId=json_data["streamConnectionId"])
    print(f"SNS published : {response}")
    
        

    
def SNS_data_postprocessing(event,json_data):
    json_response = {
            "stream": "FORM_DATA",
            "streamConnectionId": json.loads(event["Records"][0]['body'])["streamConnectionId"],
             "body": {
                 "transactionId": "f830e890-3ff2-4fdc-a08e-dd9b78a2dc28",
                  "contactId": json.loads(event["Records"][0]['body'])["streamConnectionId"],
            "form_data": json_data,
                     }  
            }
    return json_response  
    
# Lambda handler to intgerate with AWS
def lambda_handler(event,context):
    print(event) 
    
    key_to_read = json.loads(event["Records"][0]['body'])["streamConnectionId"] #event.get('conversation_id')
    #print(key_to_read)
    # Check if the key exists and read the value from Redis
    if key_to_read and redis_client.exists(key_to_read):
        value = redis_client.get(key_to_read)
        print(value) #.split("contactId"))
    else:
        print('Key not found')
    #print(f'The Input is : {value}')
    #final_transcript = ""
    #print(f'Form_Data : Received_input : {event}.')
    #for i in range(len(event["Records"])):
     #   final_transcript += "\n" + data_preprocessing(event["Records"][i])
    #print(f'Form_Data : Output_after_preprocessing_data : {final_transcript}.')
    final_transcript = data_preprocessing(value)
    print(final_transcript)
    prompt_enrollment = enrollment_prompt_generator(final_transcript,entities)
    #prompt_enrollment = get_prompt(bucket,file_key,'entity_extraction','enrollment_form_claude',final_transcript)
    print(f'Form_Data: Generated_prompt_after_acessing_prompt_library_from_s3_bucket : {prompt_enrollment}.')
    enrollment_data = load_claude2(bedrock_runtime,prompt_enrollment,0,0.9,1)
    json_pattern = re.compile(r'{(.+?)}', re.DOTALL)
    match = json_pattern.search(enrollment_data)
    if match:
        extracted_json = match.group(0)
        print(f'Postprocessed_model_output : {extracted_json}')
        sns_data = SNS_data_postprocessing(event,extracted_json)
        print(f'Form_Data : Data_to_be_sent_to_SNS_after_postprocessing_json_output_of_model : {sns_data}.')
        SNS_Publisher(sns_data)
        print(f'Form_Data : Data_sent_to_sns_sucessfully.')
        return {"statusCode": 200,"body": "Data Sent to SNS Sucessfully"}
    else:
        print("No JSON found in the text.")
        return {"statusCode": 404,"body": "Json Not Found"}

# lambda_handler.py

In [5]:
import boto3
import json
import base64
import time
import re
from botocore.exceptions import ClientError
import logging
import redis
import os
from datetime import datetime , date
from form_data_extraction import construct_call_conversation , get_prompt , enrollment_prompt_generator , load_claude2 , sns_data_postprocessing , sns_publisher
from botocore.config import Config

config = Config(read_timeout=1000)

today_date = date.today()
 
bedrock_runtime = boto3.client(
    service_name="bedrock-runtime",
    region_name= os.environ.get("Region"),
    
)
 
bedrock = boto3.client(
    service_name='bedrock', 
    region_name= os.environ.get("Region")
)
 
# Initialize Redis client
redis_host = os.environ.get("RedisReadUrl") #'ch-agent-assist-redis-cluster-disabled-ro.tjyhst.ng.0001.use1.cache.amazonaws.com'
redis_port = 6379 
redis_password = None  
redis_client = redis.Redis(host=redis_host, port=redis_port, password=redis_password, decode_responses=True)
 
entity_dict_v1 = {
    "patientFirstName": "",
    "patientMiddleInitial": "",
    "patientLastName": "",
    "dob": "",
    "age": "",
    "gender": "",
    "preferredLanguage": "",
    "streetAddress": "",
    "streetName": "",
    "city": "",
    "state": "",
    "zipCode": "",
    "email": "",
    "phone": "",
    "phoneType": "",
    "priorTherapy": "",
    "diagnosisName": "",
    "diagnosisIcd10Code": "",
    "dateOfDiagnosis": "",
    "insuranceProvider": "",
    "insuranceId": "",
    "planType": "",
    "effectiveDate": "",
    "expiryDate": "",
    "recordType": "",
    "rxBin": "",
    "rxGroup": "",
    "rxPcn": "",
    "cardHolderRelationshipWithThePatient": "",
    "cardHolderName": "",
    "cardHolderDob": "",
    "prescriberName": "",
    "specialty": "",
    "address": "",
    "facilityName": "",
    "pharmacyName": "",
    "spPhone": "",
    "spFax": ""
}

entity_list = list(entity_dict_v1.keys())
entities = ""
for val in entity_list:
    entities += val + "," 
entities = entities[:len(entities)-1]
 
def handler(event,context):
    start_timestamp = datetime.now()
    print(f'form_data: handler: START: {datetime.now()}: {str(event)}')
    contact_id = json.loads(event["Records"][0]['body'])["streamConnectionId"]
 
    if contact_id and redis_client.exists(contact_id):
        value = redis_client.get(contact_id)
        print(value) #.split("contactId"))
    else:
        print('Key not found')
    final_transcript = construct_call_conversation(value)
    print(final_transcript)
    prompt_enrollment = enrollment_prompt_generator(final_transcript,entities,today_date)
    #prompt_enrollment = get_prompt(bucket,file_key,'entity_extraction','enrollment_form_claude',final_transcript)
    #print(f'form_data: Generated_prompt_after_acessing_prompt_library_from_s3_bucket : {prompt_enrollment}.')
    model_invoke_timestamp = datetime.now()
    enrollment_data = load_claude2(bedrock_runtime,prompt_enrollment,0,0.9,1)
    model_invoke_after_timestamp = datetime.now()
    json_pattern = re.compile(r'{(.+?)}', re.DOTALL)
    match = json_pattern.search(enrollment_data)
    if match:
        extracted_json = match.group(0)
        print(f'form_data : Postprocessed_model_output : {extracted_json}')
        sns_data = sns_data_postprocessing(event,extracted_json)
        print(f'form_data : Data_to_be_sent_to_SNS_after_postprocessing_json_output_of_model : {sns_data}.')
        sns_publisher(sns_data)
        print(f'form_data : Data_sent_to_sns_sucessfully.')
        end_timestamp = datetime.now()
        duration_till_model = model_invoke_timestamp - start_timestamp
        duration_model = model_invoke_after_timestamp - model_invoke_timestamp
        duration_till_end = end_timestamp - model_invoke_after_timestamp 
        # print(f'form_data: handler: duration_before_model:{duration_before_model}')
        print(f'form_data: handler: END: duration_till_model::{duration_till_model}:: duration_model::{duration_model}:: duration_till_end::{duration_till_end}:: output: {json.dumps(sns_data)}')
        
        return {"statusCode": 200,"body": "Data Sent to SNS Sucessfully"}
    else:
        print("No JSON found in the text.")
        print(f'form_data: handler: END: {datetime.now()}: output: Error')
        return {"statusCode": 404,"body": "Json Not Found"}

# form_data_extraction.py

In [6]:
import boto3
import json
import base64
import time
import re
from botocore.exceptions import ClientError
import logging
import redis
import os

bedrock_runtime = boto3.client(
    service_name="bedrock-runtime",
    region_name= os.environ.get("Region")
)

bedrock = boto3.client(
    service_name='bedrock', 
    region_name= os.environ.get("Region")
)

def decoder(data):
    decodedBytes = base64.b64decode(data)
    decodedStr = decodedBytes.decode("ascii") 
    json_str=json.loads(decodedStr)
    return json_str

##Processing data to fetch role and content
def construct_call_conversation(data):
    segments = json.loads(data)
    convo = "" 
    # Extract transcripts, participant roles, and content
    for segment in segments:
        transcript = segment['transcript'][0]
        participant_role = transcript['ParticipantRole']
        content = transcript['Content']
        convo += participant_role + " : " + content + "\n"
    
    return convo

def model_output_postprocessing(data):
    result = ""
    start_index = data.find("{")
    
    if start_index == -1:
        data1 = "{" + data
    else:
        data1 = data
    
    start_index_final = data.find("{")    
    print(f'start_index:{start_index_final}')
    
    end_char_indices = [i.start() for i in re.finditer("}",data1)]

    #end_index = end_char_indices[len(end_char_indices)-1]
    
    if len(end_char_indices) == 0:
        data2 = data1 + '}'
    else:
        data2 = data1
      
    end_char_indices_1 = [i.start() for i in re.finditer("}",data2)]
    print(f'end_indices:{end_char_indices_1}')  
    end_index = end_char_indices[len(end_char_indices)-1]
    print(f'end_index:{end_index}')
    
    if end_index == len(data2)-1:
        result = data2[start_index_final:]
    else:
        result = data2[start_index_final:end_index+1]
    print(f'result:{result}')
    
    return result


def enrollment_prompt_generator(conversation,entities,today_date):
    prompt_claude = f"""Human: {conversation}

    The above is a transcript between a call center agent and an insurance subscriber or patient.
    From the above conversation, identify and extract the values for the following parameters {entities}.
    
    Ensure emails are recognized as valid email addresses in the format "email": "example@email.com" and phone numbers are in the standard format (e.g., (123) 456-7890 or +12345678900).
    Make sure that the words are gramatically correct and no spaces in the email address or phone number. 

    Based on today's date which is {today_date} and the patient's date of birth or dob,calculate the age of the patient and return it along with other entities in the output.
    
    Output the results as a structured JSON containing only the extracted fields having extracted values while other extracted fields have values as NULL.
    
    Strictly Follow the rules to provide ouput in JSON format and do not provide the extra sentence 'Here are the key entities extracted from the conversation before the JSON' as part of your response.
    Make sure you adhere to the list of entities extracted and create JSON with the exact key names passed and do not change the key names.
    Include only the information present in the provided conversation and do no not make-up any information on your own.
    The keys of the response json should be exactly same as the keys in {entities}.
    
    Assistant:
    """

    return prompt_claude
    
#Defining function to connect to Bedrock LLM
def load_claude2(bedrock_runtime , prompt , temp , top_p,top_k):
    try:
        body = {
            "prompt": prompt,
            "temperature": temp,
            "top_p": top_p,
            "top_k":top_k,
            "max_tokens_to_sample": 1000
            }

        response = bedrock_runtime.invoke_model(
            modelId="anthropic.claude-v2", body=json.dumps(body), accept="application/json", contentType="application/json"
                 )
        
        response_body = json.loads(response["body"].read())
        completion = response_body.get("completion")

        return completion

    except ClientError:
        logging.error("Couldn't invoke Llama 2")
        raise

def get_prompt(bucket,file,prompt_category,required_prompt,conversation):
    entities = "name of patient, status of insurance, insurance number, demographic details etc."
    s3 = boto3.client('s3') 
    response = s3.get_object(Bucket=bucket,Key=file)
    content = response['Body'].read().decode('utf-8')
    json_content = json.loads(content)
    prompt = json_content[prompt_category][required_prompt].format(conversation=conversation,entities=entities)
    
    return prompt
     
def sns_publisher(json_data):
    # Create an SNS client
    sns = boto3.client('sns')
    # Specify the topic ARN
    topic_arn = os.environ.get("SNSArn")  #'arn:aws:sns:us-east-1:383299343633:ch-agent-assist-processor-sns.fifo'
    # Publish JSON data to SNS topic
    response = sns.publish(TopicArn=topic_arn,Message=json.dumps({'default': json.dumps(json_data)}),MessageStructure='json',MessageGroupId=json_data["streamConnectionId"])
    print(f"SNS published : {response}")
    
           
def sns_data_postprocessing(event,json_data):
    json_response = {
            "stream": "FORM_DATA",
            "streamConnectionId": json.loads(event["Records"][0]['body'])["streamConnectionId"],
             "body": {
                 "transactionId": "f830e890-3ff2-4fdc-a08e-dd9b78a2dc28",
                  "contactId": json.loads(event["Records"][0]['body'])["streamConnectionId"],
            "form_data": json_data,
                     }  
            }
    return json_response  

# Insights Lambda

In [2]:
import boto3
import json
import base64
import time
import re
from botocore.exceptions import ClientError
import logging
import redis

bedrock_runtime = boto3.client(
    service_name="bedrock-runtime",
    region_name="us-east-1",
)

bedrock = boto3.client(
    service_name='bedrock', 
    region_name='us-east-1'
)

# Initialize Redis client
redis_host = 'ch-agent-assist-redis-cluster-disabled-ro.tjyhst.ng.0001.use1.cache.amazonaws.com'
redis_port = 6379 # Default Redis port is 6379
redis_password = None  # None if no password
redis_client = redis.Redis(host=redis_host, port=redis_port, password=redis_password, decode_responses=True)

brt = boto3.client(service_name='bedrock-runtime')
bucket = 'ch-agent-assist-prompt-library-bucket'
file_key = 'prompts_library.json'



def decoder(data):
    decodedBytes = base64.b64decode(data)
    decodedStr = decodedBytes.decode("ascii") 
    json_str=json.loads(decodedStr)
    return json_str

##Processing data to fetch role and content
def data_preprocessing(data):
    segments = json.loads(data)
    convo = "" 
    # Extract transcripts, participant roles, and content
    for segment in segments:
        transcript = segment['transcript'][0]
        participant_role = transcript['ParticipantRole']
        content = transcript['Content']
        convo += participant_role + " : " + content + "\n"
    
    return convo

def data_postprocessing(data):
    result = ""
    start_index = data.find("{")
    end_char_indices = [i.start() for i in re.finditer("}",data)]
    end_index = end_char_indices[len(end_char_indices)-1]
    result = data[start_index:end_index+1]
    
    return result

    
#Defining function to connect to Bedrock LLM
def load_claude2(bedrock_runtime , prompt , temp , top_p,top_k):
    try:
        body = {
            "prompt": prompt,
            "temperature": temp,
            "top_p": top_p,
            "top_k":top_k,
            "max_tokens_to_sample": 1000
            }

        response = bedrock_runtime.invoke_model(
            modelId="anthropic.claude-v2", body=json.dumps(body), accept="application/json", contentType="application/json"
                 )
        
        response_body = json.loads(response["body"].read())
        completion = response_body.get("completion")

        return completion

    except ClientError:
        logging.error("Couldn't invoke Llama 2")
        raise

#Passing the payer vs pharmacy stats as part of the context

insurance_statistics = '''
{
  "ANTHEM INC.": {
    "ALTSCRIPTS": "5 days",
    "RECEPTRX": "14 days",
    "GENTRY HEALTH": "15 days",
    "ALLIANCE RX": "22 days",
    "BIOPLUS": "26 days"
  },
  "BLUE CROSS/BLUE SHIELD": {
    "US BIO": "6 days",
    "RECEPTRX": "24 days",
    "ALLIANCE RX": "43 days",
    "KROGER SPECIALTY PHARMACY": "45 days",
    "ACARIA": "45 days"
  },
  "CENTENE CORPORATION": {
    "GENTRY HEALTH": "4 days",
    "CENTERWELL": "5 days",
    "FAIRVIEW SP": "5 days",
    "LUMICERA": "16 days",
    "BIOPLUS": "22 days"
  },
  "CIGNA": {
    "AMBER": "13 days",
    "CENTURY SPECIALTY": "14 days",
    "ALTSCRIPTS": "17 days",
    "KROGER SPECIALTY PHARMACY": "26 days",
    "LUMICERA": "30 days"
  },
  "CVS CAREMARK RX": {
    "FAIRVIEW SP": "6 days",
    "CENTURY SPECIALTY": "11 days",
    "ALTSCRIPTS": "14 days",
    "AMBER": "17 days",
    "ELIXIR": "28 days"
  },
  "CVS HEALTH (AETNA)": {
    "CENTURY SPECIALTY": "11 days",
    "ALTSCRIPTS": "12 days",
    "ELIXIR": "16 days",
    "GENTRY HEALTH": "17 days",
    "ALLIANCE RX": "28 days"
  },
  "DST PHARMACY SOLUTIONS": {
    "ARDON": "3 days"
  },
  "HUMANA INC.": {
    "LUMICERA": "8 days",
    "ALTSCRIPTS": "15 days",
    "ALLIANCE RX": "19 days",
    "FAIRVIEW SP": "20 days",
    "BIOPLUS": "21 days"
  },
  "OPTUMRX": {
    "ELIXIR": "17 days",
    "NOBLE HEALTH SERVICES": "24 days",
    "BIOPLUS": "28 days",
    "ACARIA": "30 days",
    "CENTERWELL": "32 days"
  },
  "RELAY HEALTH": {
    "RECEPTRX": "4 days"
  },
  "UNITED HEALTH": {
    "CENTURY SPECIALTY": "11 days",
    "GENTRY HEALTH": "19 days",
    "LUMICERA": "19 days",
    "FAIRVIEW SP": "26 days",
    "ALLIANCE RX": "28 days"
  }
}
'''

def get_enrollment_prompt(bucket,file,prompt_category,required_prompt,conversation):
    entities = "name of patient, status of insurance, insurance number, demographic details etc."
    s3 = boto3.client('s3') 
    response = s3.get_object(Bucket=bucket,Key=file)
    content = response['Body'].read().decode('utf-8')
    json_content = json.loads(content)
    prompt = json_content[prompt_category][required_prompt].format(conversation=conversation,entities=entities)
    
    return prompt

def get_insights_prompt(bucket,file,prompt_category,required_prompt,insurance_provider):
    entities = "name of patient, status of insurance, insurance number, demographic details etc."
    s3 = boto3.client('s3') 
    response = s3.get_object(Bucket=bucket,Key=file)
    content = response['Body'].read().decode('utf-8')
    json_content = json.loads(content)
    prompt = json_content[prompt_category][required_prompt].format(insurance_provider=insurance_provider,insurance_statistics=insurance_statistics)
    
    return prompt



entity_dict_v1 = {
   "patient_first_name": "",
    "patient_middle_initial": "",
    "patient_last_name": "",
    "dob": "",
    "gender": "",
    "preferred_language": "",
    "street_address": "",
    "street_name": "",
    "city": "",
    "state": "",
    "zip_code": "",
    "email": "",
    "phone": "",
    "phone_type": "",
    "prior_therapy": "",
    "diagnosis_name": "",
    "diagnosis_icd_10_code": "",
    "date_of_diagnosis": "",
    "insurance_provider": "",
    "insurance_id": "",
    "plan_type": "",
    "effective_date": "",
    "expiry_date": "",
    "record_type": "",
    "rx_bin": "",
    "rx_group": "",
    "rx_pcn": "",
    "card_holder_relationship_with_the_patient":"",
    "card_holder_name":"",
    "card_holder_dob":"",
    "prescriber_name": "",
    "specialty": "",
    "address": "",
    "facility_name": "",
    "sp_name": "",
    "sp_phone": "",
    "sp_fax": ""
  }
  
entity_list = list(entity_dict_v1.keys())
entities = ""
for val in entity_list:
    entities += val + "," 
entities = entities[:len(entities)-1]

def enrollment_prompt_generator(conversation,entities):
    prompt_claude = f"""Human: {conversation}

    The above is a transcript between a call center agent and an insurance subscriber or patient. Identify and extract key entities such as {entities} from the transcript. Include only the information present.

    Output the results as a structured JSON containing only the extracted fields.
    
    Strictly Follow the rules to provide ouput in JSON format and do not provide the extra sentence 'Here are the key entities extracted from the conversation before the JSON' as part of your response.

    Assistant:
    """

    return prompt_claude
    
def insights_prompt_generator(insurance_provider,insurance_statistics):
    prompt_claude = """Human: 
 
You are Agent assist tracking the Patient and agent conversation and help the agent recommend meaningful insights on the 
insurance and insurance details related insights like for example suggesting which pharmacy to select based on the
patient's Insurance provider using metrics like how soon the pharmacy dispenses the medication to the patient.The lesser the
number of days to dispense the medication,the higher are the chances of recommendation of that pharmacy.

The patients's insurance provider is  \" """ + insurance_provider + """ \" and use the following json data to provide the insights:
\" """ + insurance_statistics + """ \".

In the Json Data,the keys represent the insurance provider and the value represents the pharmacy company and the number of days
it takes to dispense the medication to the patient.

Provide the response in a structured and easily readable format.
 
Assistant:
"""
    return prompt_claude


def SNS_Publisher(json_data):
    # Create an SNS client
    sns = boto3.client('sns')
    # Specify the topic ARN
    topic_arn = 'arn:aws:sns:us-east-1:383299343633:ch-agent-assist-processor-sns.fifo'
    # Publish JSON data to SNS topic
    response = sns.publish(TopicArn=topic_arn,Message=json.dumps({'default': json.dumps(json_data)}),MessageStructure='json',MessageGroupId=json_data["streamConnectionId"])
    print(f"SNS published : {response}")
        
def SNS_data_postprocessing(event,insights_data):
    json_response = {
            "stream": "INSIGHTS_DATA",
            "streamConnectionId": json.loads(event["Records"][0]['body'])["streamConnectionId"],
             "body": {
                 "transactionId": "f830e890-3ff2-4fdc-a08e-dd9b78a2dc28",
                  "contactId": json.loads(event["Records"][0]['body'])["streamConnectionId"],
            "form_data": insights_data,
                     }  
            }
    return json_response 

# Lambda handler to intgerate with AWS
def lambda_handler(event,context):
    print(event)
    key_to_read = json.loads(event["Records"][0]['body'])["streamConnectionId"] #event.get('conversation_id')
    #print(key_to_read)
    # Check if the key exists and read the value from Redis
    if key_to_read and redis_client.exists(key_to_read):
        value = redis_client.get(key_to_read)
        print(value) #.split("contactId"))
    else:
        print('Key not found')
    #final_transcript = ""
    #for i in range(len(event['Records'])):
     #   final_transcript += "\n" + data_preprocessing(event['Records'][i])
    final_transcript = data_preprocessing(value)
    print(final_transcript)
    #prompt_enrollment = get_enrollment_prompt(bucket,file_key,'entity_extraction','enrollment_form_claude',final_transcript)
    prompt_enrollment = enrollment_prompt_generator(final_transcript,entities)
    enrollment_data = load_claude2(bedrock_runtime,prompt_enrollment,0,0.9,1)
    #json_data = data_postprocessing(enrollment_data)
    json_pattern = re.compile(r'{(.+?)}', re.DOTALL)
    match = json_pattern.search(enrollment_data)
    extracted_json = match.group(0)
    enrollment_json_object = json.loads(extracted_json)
    #print("Printing all the keys",enrollment_json_object.keys())
    #keys = enrollment_json_object.keys()
    if "insurance_provider" in enrollment_json_object.keys():
        insurance_provider = enrollment_json_object["insurance_provider"]
        insights_prompt = get_insights_prompt(bucket,file_key,'insight_generation','insights_claude',insurance_provider)
        insights = load_claude2(bedrock_runtime,insights_prompt,0.5,0.9,1)
        print("Generating Insights successfully", insights)
        sns_data = SNS_data_postprocessing(event,insights)
    else:
        sns_data = SNS_data_postprocessing(event," ")
    #SQS_Publisher(sqs_data)
    SNS_Publisher(sns_data)
    #print(sqs_data)
    return {"statusCode": 200,"body": "Data Sent to SNS Sucessfully"}

# lambda_handler

In [7]:
import boto3
import json
import base64
import time
import re
import os
from botocore.exceptions import ClientError
import logging
import redis
from datetime import datetime , date
from insights import construct_call_conversation , enrollment_prompt_generator , load_claude2 , sns_data_postprocessing  
from insights import sns_publisher , get_insights_prompt , get_enrollment_prompt , insights_prompt_generator

today_date = date.today()

bedrock_runtime = boto3.client(
    service_name="bedrock-runtime",
    region_name= os.environ.get("Region") #"us-east-1",
)
 
bedrock = boto3.client(
    service_name='bedrock', 
    region_name= os.environ.get("Region")  #'us-east-1'
)
 
# Initialize Redis client

redis_host = 'ch-agent-assist-redis-cluster-disabled.tjyhst.ng.0001.use1.cache.amazonaws.com'
#os.environ.get("RedisReadUrl") # 'ch-agent-assist-redis-cluster-disabled-ro.tjyhst.ng.0001.use1.cache.amazonaws.com'

redis_port = 6379 # Default Redis port is 6379

redis_password = None  # None if no password

redis_client = redis.Redis(host=redis_host, port=redis_port, password=redis_password, decode_responses=True)
 
prompt_bucket = 'ch-agent-assist-prompt-library-bucket'

file_key = 'prompts_library.json'
 
#Passing the payer vs pharmacy stats as part of the context
 
insurance_statistics = '''
{
  "ANTHEM INC.": {
    "ALTSCRIPTS": "5 days",
    "RECEPTRX": "14 days",
    "GENTRY HEALTH": "15 days",
    "ALLIANCE RX": "22 days",
    "BIOPLUS": "26 days"
  },

  "BLUE CROSS/BLUE SHIELD": {
    "US BIO": "6 days",
    "RECEPTRX": "24 days",
    "ALLIANCE RX": "43 days",
    "KROGER SPECIALTY PHARMACY": "45 days",
    "ACARIA": "45 days"
  },

  "CENTENE CORPORATION": {
    "GENTRY HEALTH": "4 days",
    "CENTERWELL": "5 days",
    "FAIRVIEW SP": "5 days",
    "LUMICERA": "16 days",
    "BIOPLUS": "22 days"
  },

  "CIGNA": {
    "AMBER": "13 days",
    "CENTURY SPECIALTY": "14 days",
    "ALTSCRIPTS": "17 days",
    "KROGER SPECIALTY PHARMACY": "26 days",
    "LUMICERA": "30 days"
  },

  "CVS CAREMARK RX": {
    "FAIRVIEW SP": "6 days",
    "CENTURY SPECIALTY": "11 days",
    "ALTSCRIPTS": "14 days",
    "AMBER": "17 days",
    "ELIXIR": "28 days"
  },

  "CVS HEALTH (AETNA)": {
    "CENTURY SPECIALTY": "11 days",
    "ALTSCRIPTS": "12 days",
    "ELIXIR": "16 days",
    "GENTRY HEALTH": "17 days",
    "ALLIANCE RX": "28 days"
  },

  "DST PHARMACY SOLUTIONS": {
    "ARDON": "3 days"
  },

  "HUMANA INC.": {
    "LUMICERA": "8 days",
    "ALTSCRIPTS": "15 days",
    "ALLIANCE RX": "19 days",
    "FAIRVIEW SP": "20 days",
    "BIOPLUS": "21 days"
  },

  "OPTUMRX": {
    "ELIXIR": "17 days",
    "NOBLE HEALTH SERVICES": "24 days",
    "BIOPLUS": "28 days",
    "ACARIA": "30 days",
    "CENTERWELL": "32 days"
  },

  "RELAY HEALTH": {
    "RECEPTRX": "4 days"
    
  },
  
    "AETNA": {
    "CVS": "4 days",
    "WALGREENS": "2 days"
    
  },

  "UNITED HEALTH": {
    "CVS": "11 days",
    "WALGREENS": "14 days",
    "GENTRY HEALTH": "19 days",
    "LUMICERA": "19 days",
    "FAIRVIEW SP": "26 days",
    "ALLIANCE RX": "28 days"
  }
}

'''
 
entity_dict_v1 = {
    "patientFirstName": "",
    "patientMiddleInitial": "",
    "patientLastName": "",
    "dob": "",
    "age": "",
    "gender": "",
    "preferredLanguage": "",
    "streetAddress": "",
    "streetName": "",
    "city": "",
    "state": "",
    "zipCode": "",
    "email": "",
    "phone": "",
    "phoneType": "",
    "priorTherapy": "",
    "diagnosisName": "",
    "diagnosisIcd10Code": "",
    "dateOfDiagnosis": "",
    "insuranceProvider": "",
    "insuranceId": "",
    "planType": "",
    "effectiveDate": "",
    "expiryDate": "",
    "recordType": "",
    "rxBin": "",
    "rxGroup": "",
    "rxPcn": "",
    "cardHolderRelationshipWithThePatient": "",
    "cardHolderName": "",
    "cardHolderDob": "",
    "prescriberName": "",
    "specialty": "",
    "address": "",
    "facilityName": "",
    "pharmacyName": "",
    "spPhone": "",
    "spFax": ""
}

entity_list = list(entity_dict_v1.keys())

entities = ""

for val in entity_list:
    entities += val + "," 

entities = entities[:len(entities)-1]
# Lambda handler to intgerate with AWS

def handler(event,context):
    start_timestamp = datetime.now()
    print(f'insights_data: handler: START: {datetime.now()}: {str(event)}')
    contact_id = json.loads(event["Records"][0]['body'])["streamConnectionId"] #event.get('conversation_id')
    #print(key_to_read)
    # Check if the key exists and read the value from Redis
    if contact_id and redis_client.exists(contact_id):
        value = redis_client.get(contact_id)
        print(value) #.split("contactId"))
    else:
        print('Key not found')
    #final_transcript = ""
    #for i in range(len(event['Records'])):
     #   final_transcript += "\n" + data_preprocessing(event['Records'][i])
    final_transcript = construct_call_conversation(value)
    #print(final_transcript)
    
    #sample_transcript = "Customer : Hello , I'm Mark."
    #prompt_enrollment = get_enrollment_prompt(prompt_bucket,file_key,'entity_extraction','enrollment_form_claude',final_transcript)
    prompt_enrollment = enrollment_prompt_generator(final_transcript,entities,today_date)
    #prompt_enrollment = enrollment_prompt_generator(sample_transcript,entities)
    entity_extraction_model_invoke_timestamp = datetime.now()
    enrollment_data = load_claude2(bedrock_runtime,prompt_enrollment,0,0.9,1)
    entity_extraction_model_invoke_after_timestamp = datetime.now()
    print(enrollment_data)
    #json_data = data_postprocessing(enrollment_data)
    json_pattern = re.compile(r'{(.+?)}', re.DOTALL)
    match = json_pattern.search(enrollment_data)
    extracted_json = match.group(0)
    enrollment_json_object = json.loads(extracted_json)
    print("Printing all the keys",list(enrollment_json_object.keys()))
    enrollment_keys = list(enrollment_json_object.keys())
    if "insuranceProvider" in enrollment_json_object.keys() and "pharmacyName" in enrollment_json_object.keys() and enrollment_json_object["insuranceProvider"] != "" and enrollment_json_object["pharmacyName"] != "":
        redis_key = str(json.loads(event["Records"][0]['body'])["streamConnectionId"]) + '::insights_data'
        if redis_client.exists(redis_key):
            print(f'redis_key:{redis_key}')
            insights_redis_data_value = redis_client.get(redis_key)
            print(f'insights_redis_data_value:{insights_redis_data_value}')
            insights_redis_data = eval(insights_redis_data_value)
            print(f'insights_redis_data : {insights_redis_data}')
        #redis_client.set(redis_key,str([{"insuranceProvider":'United Health','pharmacyName':'CVS','insights':'Insights'}]))
        #redis_client.set(redis_key,str([]))
        #insights_redis_data_value = redis_client.get(redis_key)
        #print(f'insights_redis_data_value:{insights_redis_data_value}')
        #insights_redis_data = eval(insights_redis_data_value)
        #print(f'insights_redis_data : {insights_redis_data}')
        if len(insights_redis_data) == 0:
            insurance_provider = enrollment_json_object["insuranceProvider"]
            pharmacyName = enrollment_json_object["pharmacyName"]
            #insights_prompt = get_insights_prompt(prompt_bucket,file_key,'insight_generation','insights_claude',insurance_provider,pharmacyName,insurance_statistics)
            insights_prompt = insights_prompt_generator(insurance_provider,pharmacyName,insurance_statistics)
            insights_model_invoke_timestamp = datetime.now()
            insights = load_claude2(bedrock_runtime,insights_prompt,0.5,0.9,1)
            insights_model_invoke_after_timestamp = datetime.now()
            print("Generating Insights successfully", insights)
            sns_data = sns_data_postprocessing(event,insights)
            print("sns_data : ",sns_data)
            sns_publisher(sns_data)
            insights_redis_data.append({'insuranceProvider':enrollment_json_object['insuranceProvider'],'pharmacyName':enrollment_json_object['pharmacyName'],'insights':insights})
            print(f'new_value : {insights_redis_data}')
            redis_client.set(redis_key,str(insights_redis_data))
            return {'statusCode':200,'body':'Data Sent to SNS Sucessfully.'}
          
        for val in insights_redis_data:
            if val['insuranceProvider'] == enrollment_json_object['insuranceProvider'] and val['pharmacyName'] == enrollment_json_object['pharmacyName']:
                print(f'insights_redis_data:{insights_redis_data}')
                return {"statusCode" : 200 , "body" : "Generated Insight already present in the database."}
                print('Generated Insight already present in the database.')
            #pass
            else:
                print('Have come to else...')
                insurance_provider = enrollment_json_object["insuranceProvider"]
                pharmacyName = enrollment_json_object["pharmacyName"]
                #insights_prompt = get_insights_prompt(prompt_bucket,file_key,'insight_generation','insights_claude',insurance_provider,pharmacyName,insurance_statistics)
                insights_prompt = insights_prompt_generator(insurance_provider,pharmacyName,insurance_statistics)
                insights_model_invoke_timestamp = datetime.now()
                insights = load_claude2(bedrock_runtime,insights_prompt,0.5,0.9,1)
                insights_model_invoke_after_timestamp = datetime.now()
                print("Generating Insights successfully", insights)
                sns_data = sns_data_postprocessing(event,insights)
                print("sns_data : ",sns_data)
                sns_publisher(sns_data)
                insights_redis_data.append({'insuranceProvider':enrollment_json_object['insuranceProvider'],'pharmacyName':enrollment_json_object['pharmacyName'],'insights':insights})
                print(f'insights_redis_data:{insights_redis_data}')
                redis_client.set(redis_key,str(insights_redis_data))
                end_timestamp = datetime.now()
                duration_till_entity_model_invoking = entity_extraction_model_invoke_timestamp - start_timestamp
                duration_of_response_of_entity_model = entity_extraction_model_invoke_after_timestamp - entity_extraction_model_invoke_timestamp
                duration_between_respone_of_entityModel_and_insightsModel = insights_model_invoke_timestamp - entity_extraction_model_invoke_after_timestamp 
                duration_of_response_of_insights_model = insights_model_invoke_after_timestamp - insights_model_invoke_timestamp
                duration_till_end = end_timestamp - insights_model_invoke_after_timestamp
                print(f'insights_data: handler: END: duration_start_to_entity_model::{duration_till_entity_model_invoking}:: duration_entity_model::{duration_of_response_of_entity_model}:: duration_entity_model_to_insights_model::{duration_between_respone_of_entityModel_and_insightsModel}:: duration_insights_model::{duration_of_response_of_insights_model}:: duration_insight_model_to_end::{duration_till_end}:: output: {json.dumps(sns_data)}')
                return {"statusCode": 200,"body": "Data Sent to SNS Sucessfully"}
        else:
            new_value_list = []
            redis_client.set(redis_key,str([])) 
            insurance_provider = enrollment_json_object["insuranceProvider"]
            pharmacyName = enrollment_json_object["pharmacyName"]
            insights_prompt = get_insights_prompt(prompt_bucket,file_key,'insight_generation','insights_claude',insurance_provider,pharmacyName,insurance_statistics)
            insights_prompt = insights_prompt_generator(insurance_provider,pharmacyName,insurance_statistics)
            insights_model_invoke_timestamp = datetime.now()
            insights = load_claude2(bedrock_runtime,insights_prompt,0.5,0.9,1)
            insights_model_invoke_after_timestamp = datetime.now()
            print("Generating Insights successfully", insights)
            sns_data = sns_data_postprocessing(event,insights)
            print("sns_data : ",sns_data)
            sns_publisher(sns_data)
            new_value_list.append({'insuranceProvider':enrollment_json_object['insuranceProvider'],'pharmacyName':enrollment_json_object['pharmacyName'],'insights':insights})
            print(f'new_value : {new_value_list}')
            redis_client.set(redis_key,str(new_value_list))
            print('Insights Lambda Triggered.')
            end_timestamp = datetime.now()
            duration_till_entity_model_invoking = entity_extraction_model_invoke_timestamp - start_timestamp
            duration_of_response_of_entity_model = entity_extraction_model_invoke_after_timestamp - entity_extraction_model_invoke_timestamp
            duration_between_respone_of_entityModel_and_insightsModel = insights_model_invoke_timestamp - entity_extraction_model_invoke_after_timestamp 
            duration_of_response_of_insights_model = insights_model_invoke_after_timestamp - insights_model_invoke_timestamp
            duration_till_end = end_timestamp - insights_model_invoke_after_timestamp
            print(f'insights_data: handler: END: duration_start_to_entity_model::{duration_till_entity_model_invoking}:: duration_entity_model::{duration_of_response_of_entity_model}:: duration_entity_model_to_insights_model::{duration_between_respone_of_entityModel_and_insightsModel}:: duration_insights_model::{duration_of_response_of_insights_model}:: duration_insight_model_to_end::{duration_till_end}:: output: {json.dumps(sns_data)}')
            return {"statusCode": 200,"body": "Data Sent to SNS Sucessfully"}
    else:
        #sns_data = sns_data_postprocessing(event," ")
        print("No Insigts Generated.")
        return {"statusCode": 200,"body": "No Insigts Generated."}
    #SQS_Publisher(sqs_data)
    #print("sns_data : ",sns_data)

# Insights.py

In [8]:
import boto3
import json
import base64
import time
import os
import re
from botocore.exceptions import ClientError
import logging
import redis

bedrock_runtime = boto3.client(
    service_name="bedrock-runtime",
    region_name= os.environ.get("Region") #"us-east-1",
)

bedrock = boto3.client(
    service_name='bedrock', 
    region_name= os.environ.get("Region")  #'us-east-1'
)

def decoder(data):
    decodedBytes = base64.b64decode(data)
    decodedStr = decodedBytes.decode("ascii") 
    json_str=json.loads(decodedStr)
    return json_str

##Processing data to fetch role and content
def construct_call_conversation(data):
    segments = json.loads(data)
    convo = "" 
    # Extract transcripts, participant roles, and content
    for segment in segments:
        transcript = segment['transcript'][0]
        participant_role = transcript['ParticipantRole']
        content = transcript['Content']
        convo += participant_role + " : " + content + "\n"
    
    return convo

def model_output_postprocessing(data):
    result = ""
    start_index = data.find("{")
    end_char_indices = [i.start() for i in re.finditer("}",data)]
    end_index = end_char_indices[len(end_char_indices)-1]
    result = data[start_index:end_index+1]
    
    return result

#Defining function to connect to Bedrock LLM
def load_claude2(bedrock_runtime , prompt , temp , top_p,top_k):
    try:
        body = {
            "prompt": prompt,
            "temperature": temp,
            "top_p": top_p,
            "top_k":top_k,
            "max_tokens_to_sample": 1000
            }

        response = bedrock_runtime.invoke_model(
            modelId="anthropic.claude-v2", body=json.dumps(body), accept="application/json", contentType="application/json"
                 )
        
        response_body = json.loads(response["body"].read())
        completion = response_body.get("completion")

        return completion

    except ClientError:
        logging.error("Couldn't invoke Llama 2")
        raise

#Passing the payer vs pharmacy stats as part of the context

def get_enrollment_prompt(bucket,file,prompt_category,required_prompt,conversation):
    entities = "name of patient, status of insurance, insurance number, demographic details etc."
    s3 = boto3.client('s3') 
    response = s3.get_object(Bucket=bucket,Key=file)
    content = response['Body'].read().decode('utf-8')
    json_content = json.loads(content)
    prompt = json_content[prompt_category][required_prompt].format(conversation=conversation,entities=entities)
    
    return prompt

def get_insights_prompt(bucket,file,prompt_category,required_prompt,insurance_provider,insurance_statistics):
    entities = "name of patient, status of insurance, insurance number, demographic details etc."
    s3 = boto3.client('s3') 
    response = s3.get_object(Bucket=bucket,Key=file)
    content = response['Body'].read().decode('utf-8')
    json_content = json.loads(content)
    prompt = json_content[prompt_category][required_prompt].format(insurance_provider=insurance_provider,insurance_statistics=insurance_statistics)
    
    return prompt

def enrollment_prompt_generator(conversation,entities,today_date):
    prompt_claude = f"""Human: {conversation}

    The above is a transcript between a call center agent and an insurance subscriber or patient. 
    Identify and extract key entities such as {entities} from the transcript. Include only the information present.
    
    Ensure emails are recognized as valid email addresses in the format "email": "example@email.com" and phone numbers are in the standard format (e.g., (123) 456-7890 or +12345678900).
    Make sure that the words are gramatically correct and no spaces in the email address or phone number. 

    Based on today's date which is {today_date} and the patient's date of birth or dob,calculate the age of the patient and return it along with other entities in the output.
    
    Output the results as a structured JSON containing only the extracted fields.
    
    Strictly Follow the rules to provide ouput in JSON format and do not provide the extra sentence 'Here are the key entities extracted from the conversation before the JSON' as part of your response.
    Make sure you adhere to the list of entities extracted and create JSON with the exact key names passed and do not change the key names.
    Include only the information present in the provided conversation and do no not make-up any information on your own.
    The keys of the response json should be exactly same as the keys in {entities}.
    
    Assistant:
    """

    return prompt_claude
    
def insights_prompt_generator(insurance_provider,pharmacy_name,insurance_statistics):
    prompt_claude = """Human: 
    
You are Agent assist tracking the Patient and agent conversation and help the agent recommend meaningful insights based on the 
insurance and pharmacy details related insights like for example suggesting which pharmacy to select based on the
patient's Insurance provider and the pharmacy using metrics like how soon the other pharmacies if having a quicker dispense time can be helpful.
The lesser the number of days to dispense the medication,the higher are the chances of recommendation of that pharmacy.

The patients's insurance provider is  \" """ + insurance_provider+ """ \" and pharmacy is  \" """ + pharmacy_name+ """ \" and use the following insurance statistics data to provide the insights:
\" """ + insurance_statistics + """ \".

In the insurance statistics data provided above,the keys represent the insurance provider and the value represents the pharmacy company and the number of days
it takes to dispense the medication to the patient.

Only provide the response when you have the values for both insurance provider and pharmacy name from the patient.Do not generate any insights unless you have both these information.
Also do not recommend any insights if the patient already has the pharmacy which is the quickest according to the insurance statistics and just suggest the patient to continue
with their existing pharmacy and not change their choice of pharmacy.


Provide the response in a structured and easily readable format.

Assistant:
"""
    return prompt_claude

def sns_publisher(json_data):
    # Create an SNS client
    sns = boto3.client('sns')
    # Specify the topic ARN
    topic_arn = os.environ.get("SNSArn")  #'arn:aws:sns:us-east-1:383299343633:ch-agent-assist-processor-sns.fifo'
    # Publish JSON data to SNS topic
    response = sns.publish(TopicArn=topic_arn,Message=json.dumps({'default': json.dumps(json_data)}),MessageStructure='json',MessageGroupId=json_data["streamConnectionId"])
    print(f"SNS published : {response}")
        
def sns_data_postprocessing(event,insights_data):
    json_response = {
            "stream": "INSIGHTS_DATA",
            "streamConnectionId": json.loads(event["Records"][0]['body'])["streamConnectionId"],
             "body": {
                 "transactionId": "f830e890-3ff2-4fdc-a08e-dd9b78a2dc28",
                  "contactId": json.loads(event["Records"][0]['body'])["streamConnectionId"],
            "insights_data": insights_data,
                     }  
            }
    return json_response

# Call Summary Lambda

In [4]:
import boto3
import json
import base64
import time
import re
from botocore.exceptions import ClientError
import logging
import redis
import os

bedrock_runtime = boto3.client(
    service_name="bedrock-runtime",
    region_name="us-east-1",
)

bedrock = boto3.client(
    service_name='bedrock', 
    region_name='us-east-1'
)

brt = boto3.client(service_name='bedrock-runtime')

# Initialize Redis client
redis_host = 'ch-agent-assist-redis-cluster-disabled-ro.tjyhst.ng.0001.use1.cache.amazonaws.com'
redis_port = 6379 # Default Redis port is 6379
redis_password = None  # None if no password
redis_client = redis.Redis(host=redis_host, port=redis_port, password=redis_password, decode_responses=True)

##Processing data to fetch role and content
def data_preprocessing(data):
    segments = json.loads(data)
    convo = "" 
    # Extract transcripts, participant roles, and content
    for segment in segments:
        transcript = segment['transcript'][0]
        participant_role = transcript['ParticipantRole']
        content = transcript['Content']
        convo += participant_role + " : " + content + "\n"
        
    return convo

def data_postprocessing(data):
    result = ""
    start_index = data.find("{")
    
    if start_index == -1:
        data1 = "{" + data
    else:
        data1 = data
    
    start_index_final = data.find("{")    
    print(f'start_index:{start_index_final}')
    
    end_char_indices = [i.start() for i in re.finditer("}",data1)]
    print(f'end_indices:{end_char_indices}')
    end_index = end_char_indices[len(end_char_indices)-1]
    print(f'end_index:{end_index}')
    if end_index == len(data1)-1:
        result = data1[start_index_final:]
    else:
        result = data1[start_index_final:end_index+1]
    print(f'result:{result}')
    
    return result
    
def summarisation_prompt_generator(context):
    prompt_llama = f"""
Instruction: "Summarise this call transcript between a patient and an agent and include the information shared by the patient in a precise paragraph":
NOTE: Consider the below context as your only source of information and provide the response in a paragraph

{context}
             
Response :  
    """
    return prompt_llama
    
#Defining function to summarize context
def load_llama2(bedrock_runtime , prompt , temp , top_p):
    try:
        body = {
            "prompt" : prompt,
            "temperature" : temp,
            "top_p" : top_p,
            "max_gen_len" : 1000
            }

        response = bedrock_runtime.invoke_model(
            modelId="meta.llama2-13b-chat-v1", body=json.dumps(body)
        )

        response_body = json.loads(response["body"].read())
        completion = response_body["generation"]

        return completion

    except ClientError:
        logging.error("Couldn't invoke Llama 2")
        raise
    

def SNS_data_postprocessing(event,data):
    json_response = {
            "stream": "SUMMARY",
            "streamConnectionId": json.loads(event["Records"][0]['body'])["streamConnectionId"],
             "body": {
                 "transactionId": "f830e890-3ff2-4fdc-a08e-dd9b78a2dc28",
                  "contactId": json.loads(event["Records"][0]['body'])["streamConnectionId"],
            "SUMMARY": data,
                     }  
            }
    return json_response
    
def SNS_Publisher(json_data):
    # Create an SNS client
    sns = boto3.client('sns')
    # Specify the topic ARN
    topic_arn = 'arn:aws:sns:us-east-1:383299343633:ch-agent-assist-processor-sns.fifo'
    # Publish JSON data to SNS topic
    response = sns.publish(TopicArn=topic_arn,Message=json.dumps({'default': json.dumps(json_data)}),MessageStructure='json',MessageGroupId=json_data["streamConnectionId"])
    print(f"SNS published : {response}")
    
def lambda_handler(event,context):
    #print(event) 
    key_to_read = json.loads(event["Records"][0]['body'])["streamConnectionId"] #event.get('conversation_id')
    #key_to_read = '8ed8572a-f54d-4e15-bf96-ed50264b5f31'
    #print(key_to_read)
    # Check if the key exists and read the value from Redis
    if key_to_read and redis_client.exists(key_to_read):
        value = redis_client.get(key_to_read)
        print(value) #.split("contactId"))
    else:
        print('Key not found')
    #print(f'The Input is : {value}')
    final_transcript = ""
    #print(f'lambda_handler--Received_input : {event}.')
    #for i in range(len(event["Records"])):
     #   final_transcript += "\n" + data_preprocessing(event["Records"][i])
    #print(f'lambda_handler--output_after_preprocessing_data : {final_transcript}.')
    final_transcript = data_preprocessing(value)
    print(final_transcript)
    prompt_summary = summarisation_prompt_generator(final_transcript)
    #print(f'lambda_handler--generated_prompt : {prompt_summary}.')
    summary_data = load_llama2(bedrock_runtime,prompt_summary,0.5,0.9)
    print(f'lambda_handler--output_from_the_llm_model : {summary_data}.')
    sns_data = SNS_data_postprocessing(event,summary_data)
    print(f'lambda_handler--data_to_be_sent_to_SNS_after_postprocessing_json_output_of_model : {sns_data}.')
    SNS_Publisher(sns_data)
    #print(f'lambda_handler--data_sent_to_sqs_sucessfully.')
    return {"statusCode": 200,"body": "Data Sent to SNS Sucessfully"}

# lambda_handler.py

In [9]:
import boto3
import json
import base64
import time
import re
from botocore.exceptions import ClientError
import logging
import redis
import os
from datetime import datetime , date
from form_data_extraction import construct_call_conversation , get_prompt , enrollment_prompt_generator , load_claude2 , sns_data_postprocessing , sns_publisher
from botocore.config import Config

config = Config(read_timeout=1000)

today_date = date.today()
 
bedrock_runtime = boto3.client(
    service_name="bedrock-runtime",
    region_name= os.environ.get("Region"),
    
)
 
bedrock = boto3.client(
    service_name='bedrock', 
    region_name= os.environ.get("Region")
)
 
# Initialize Redis client
redis_host = os.environ.get("RedisReadUrl") #'ch-agent-assist-redis-cluster-disabled-ro.tjyhst.ng.0001.use1.cache.amazonaws.com'
redis_port = 6379 
redis_password = None  
redis_client = redis.Redis(host=redis_host, port=redis_port, password=redis_password, decode_responses=True)
 
entity_dict_v1 = {
    "patientFirstName": "",
    "patientMiddleInitial": "",
    "patientLastName": "",
    "dob": "",
    "age": "",
    "gender": "",
    "preferredLanguage": "",
    "streetAddress": "",
    "streetName": "",
    "city": "",
    "state": "",
    "zipCode": "",
    "email": "",
    "phone": "",
    "phoneType": "",
    "priorTherapy": "",
    "diagnosisName": "",
    "diagnosisIcd10Code": "",
    "dateOfDiagnosis": "",
    "insuranceProvider": "",
    "insuranceId": "",
    "planType": "",
    "effectiveDate": "",
    "expiryDate": "",
    "recordType": "",
    "rxBin": "",
    "rxGroup": "",
    "rxPcn": "",
    "cardHolderRelationshipWithThePatient": "",
    "cardHolderName": "",
    "cardHolderDob": "",
    "prescriberName": "",
    "specialty": "",
    "address": "",
    "facilityName": "",
    "pharmacyName": "",
    "spPhone": "",
    "spFax": ""
}

entity_list = list(entity_dict_v1.keys())
entities = ""
for val in entity_list:
    entities += val + "," 
entities = entities[:len(entities)-1]
 
def handler(event,context):
    start_timestamp = datetime.now()
    print(f'form_data: handler: START: {datetime.now()}: {str(event)}')
    contact_id = json.loads(event["Records"][0]['body'])["streamConnectionId"]
 
    if contact_id and redis_client.exists(contact_id):
        value = redis_client.get(contact_id)
        print(value) #.split("contactId"))
    else:
        print('Key not found')
    final_transcript = construct_call_conversation(value)
    print(final_transcript)
    prompt_enrollment = enrollment_prompt_generator(final_transcript,entities,today_date)
    #prompt_enrollment = get_prompt(bucket,file_key,'entity_extraction','enrollment_form_claude',final_transcript)
    #print(f'form_data: Generated_prompt_after_acessing_prompt_library_from_s3_bucket : {prompt_enrollment}.')
    model_invoke_timestamp = datetime.now()
    enrollment_data = load_claude2(bedrock_runtime,prompt_enrollment,0,0.9,1)
    model_invoke_after_timestamp = datetime.now()
    json_pattern = re.compile(r'{(.+?)}', re.DOTALL)
    match = json_pattern.search(enrollment_data)
    if match:
        extracted_json = match.group(0)
        print(f'form_data : Postprocessed_model_output : {extracted_json}')
        sns_data = sns_data_postprocessing(event,extracted_json)
        print(f'form_data : Data_to_be_sent_to_SNS_after_postprocessing_json_output_of_model : {sns_data}.')
        sns_publisher(sns_data)
        print(f'form_data : Data_sent_to_sns_sucessfully.')
        end_timestamp = datetime.now()
        duration_till_model = model_invoke_timestamp - start_timestamp
        duration_model = model_invoke_after_timestamp - model_invoke_timestamp
        duration_till_end = end_timestamp - model_invoke_after_timestamp 
        # print(f'form_data: handler: duration_before_model:{duration_before_model}')
        print(f'form_data: handler: END: duration_till_model::{duration_till_model}:: duration_model::{duration_model}:: duration_till_end::{duration_till_end}:: output: {json.dumps(sns_data)}')
        
        return {"statusCode": 200,"body": "Data Sent to SNS Sucessfully"}
    else:
        print("No JSON found in the text.")
        print(f'form_data: handler: END: {datetime.now()}: output: Error')
        return {"statusCode": 404,"body": "Json Not Found"}

# call_summary.py

In [10]:
import boto3
import json
import base64
import time
import re
from botocore.exceptions import ClientError
import logging
import redis
import os

bedrock_runtime = boto3.client(
    service_name="bedrock-runtime",
    region_name= os.environ.get("Region")  #"us-east-1",
)

bedrock = boto3.client(
    service_name='bedrock', 
    region_name= os.environ.get("Region")  #'us-east-1'
)

##Processing data to fetch role and content
def construct_call_conversation(data):
    segments = json.loads(data)
    convo = "" 
    # Extract transcripts, participant roles, and content
    for segment in segments:
        transcript = segment['transcript'][0]
        participant_role = transcript['ParticipantRole']
        content = transcript['Content']
        convo += participant_role + " : " + content + "\n"
        
    return convo

def model_output_postprocessing(data):
    result = ""
    start_index = data.find("{")
    
    if start_index == -1:
        data1 = "{" + data
    else:
        data1 = data
    
    start_index_final = data.find("{")    
    print(f'start_index:{start_index_final}')
    
    end_char_indices = [i.start() for i in re.finditer("}",data1)]
    print(f'end_indices:{end_char_indices}')
    end_index = end_char_indices[len(end_char_indices)-1]
    print(f'end_index:{end_index}')
    if end_index == len(data1)-1:
        result = data1[start_index_final:]
    else:
        result = data1[start_index_final:end_index+1]
    print(f'result:{result}')
    
    return result
    
def summarisation_prompt_generator(context):
    prompt_llama = f"""
Instruction: "Summarise this call transcript between a patient and an agent and include the information shared by the patient in a precise paragraph":
NOTE: Consider the below context as your only source of information and provide the response in a paragraph

{context}
             
Response :  
    """
    return prompt_llama

def get_summary_prompt(bucket,file,prompt_category,required_prompt,conversation):
    s3 = boto3.client('s3') 
    response = s3.get_object(Bucket=bucket,Key=file)
    content = response['Body'].read().decode('utf-8')
    json_content = json.loads(content)
    prompt = json_content[prompt_category][required_prompt].format(context=conversation)
    
    return prompt

#Defining function to summarize context
def load_llama2(bedrock_runtime , prompt , temp , top_p):
    try:
        body = {
            "prompt" : prompt,
            "temperature" : temp,
            "top_p" : top_p,
            "max_gen_len" : 1000
            }

        response = bedrock_runtime.invoke_model(
            modelId="meta.llama2-13b-chat-v1", body=json.dumps(body)
        )

        response_body = json.loads(response["body"].read())
        completion = response_body["generation"]

        return completion

    except ClientError:
        logging.error("Couldn't invoke Llama 2")
        raise
    

def sns_data_postprocessing(event,data):
    json_response = {
            "stream": "CONVERSATION_SUMMARY",
            "streamConnectionId": json.loads(event["Records"][0]['body'])["streamConnectionId"],
             "body": {
                 "transactionId": "f830e890-3ff2-4fdc-a08e-dd9b78a2dc28",
                  "contactId": json.loads(event["Records"][0]['body'])["streamConnectionId"],
            "summary_data": data,
                     }  
            }
    return json_response
    
def sns_publisher(json_data):
    # Create an SNS client
    sns = boto3.client('sns')
    # Specify the topic ARN
    topic_arn = os.environ.get("SNSArn") #'arn:aws:sns:us-east-1:383299343633:ch-agent-assist-processor-sns.fifo'
    # Publish JSON data to SNS topic
    response = sns.publish(TopicArn=topic_arn,Message=json.dumps({'default': json.dumps(json_data)}),MessageStructure='json',MessageGroupId=json_data["streamConnectionId"])
    print(f"SNS published : {response}")