In [1]:
import sys, os, csv, ast, random, time
from pathlib import Path
import pandas as pd
import time
from collections import deque  # Queue for questions
import re

##### Prompt templates
starter_path = 'prompts/starter.txt'
questions_path = 'prompts/questions_test/Analysis/questionbank_chunked.txt'
patient_prompt_path = 'prompts/double_model_prompts/INSTRUCT/patient_prompt_v1.txt'
asst_prompt_path = 'prompts/double_model_prompts/INSTRUCT/assistant_prompt_v2.txt'
tracking_path = "./tracking.csv"

##### directory for interview outputs
out_dir = "transcripts/DM"

try:
    sys.path.append('../../transcript_creation')
    from transcript_creation import helper_fns as helper
except ModuleNotFoundError:
    import helper_fns as helper

In [2]:
##############################
# HELPER FUNCTIONS
##############################

def update_msg_list(message_list, is_assistant, content):
    role = "assistant" if is_assistant else "user"
    message_list.append({"role": role, "content": content})
    return message_list

def swap_msg_list(message_list):
    '''Return the message list with Assistant/User role labels inversed.'''
    swapped_msg_list = []
    for msg in message_list:
        if msg["role"] == "assistant":
            swapped_msg_list.append({
                "role": "user",
                "content": msg["content"]
            })
        if msg["role"] == "user":
            swapped_msg_list.append({
                "role": "assistant",
                "content": msg["content"]
            })
    return swapped_msg_list

def extract_response_text(response):
    """
    Extract the portion of the response after 'RESPONSE:'.
    If 'RESPONSE:' is not found, return the whole response.
    """
    split_key = "RESPONSE:"
    if split_key in response:
        return response.split(split_key)[-1].strip()  # Take everything after 'RESPONSE:'
    else:
        return response.strip()  # Return the whole response if 'RESPONSE:' is missing

def extract_and_store_notes(response, notes):
    """
    Extracts 'Notes' from the response and appends them to the notes string.
    """
    if "Notes:" in response:
        note_start = response.find("Notes:")
        note_end = response.find("\n", note_start)
        notes += response[note_start:note_end].strip() + "\n"
    return notes

def chunk_message_history(message_list, max_history_size=10):
    """
    Trims the message list to only keep the most recent `max_history_size` messages.
    """
    if len(message_list) > max_history_size:
        message_list = message_list[-max_history_size:]  # Keep only the most recent messages
    return message_list

In [3]:
##############################
# NEW QUESTION PARSING FUNCTIONS
##############################

def parse_questions_with_headers(question_text):
    """
    Parses the questions with headers into a structured list, preserving the hierarchy.
    Returns a list of sections, where each section is a list that includes both the headers and their associated questions.
    """
    lines = question_text.splitlines()
    current_section = []
    current_subsection = []
    sections = []

    for line in lines:
        line = line.strip()  # Clean up any extra spaces
        if line.startswith("##"):  # Section header
            if current_section:
                sections.append(current_section)
            current_section = [line]  # Start new section
        elif line.startswith("#"):  # Subheader
            if current_subsection:
                current_section.append(current_subsection)
            current_subsection = [line]  # Start new subsection
        elif line.startswith("-") or line.startswith("+"):  # Question or continuation
            current_subsection.append(line)  # Add to current subsection

    # Add the final section and subsection
    if current_subsection:
        current_section.append(current_subsection)
    if current_section:
        sections.append(current_section)
    
    return sections

def chunk_questions(sections):
    """
    Chunk the questions into a list of lists, where each sublist contains
    questions from a specific sub-section. Each list starts with the 
    section title and subheader as the first entry, followed by the questions.
    """
    chunked_questions = []

    for section in sections:
        section_title = section[0]  # Get the main section title
        
        for sub_section in section[1:]:
            sub_section_title = sub_section[0]  # Get the sub-header title
            sub_section_chunks = [f"{section_title} {sub_section_title}"]  # Start with combined title
            
            # Add the first question if it exists
            if len(sub_section) > 1:
                first_question = sub_section[1]
                sub_section_chunks = [f"{section_title} {sub_section_title} {first_question}"]
            
            # Add the rest of the questions in the sub-section
            for question in sub_section[2:]:
                sub_section_chunks.append(question)
            
            # Append this sub-section's chunk of questions to the overall list
            chunked_questions.append(sub_section_chunks)

    return chunked_questions


def queue_questions(chunks, questions_queue, num_questions_to_add=2):
    """
    Adds a set number of questions from the chunks to the queue, ensuring headers are preserved.
    """
    while len(questions_queue) < num_questions_to_add and chunks:
        chunk = chunks.pop(0)
        questions_queue.extend(chunk)  # Add chunk to queue
    return questions_queue

In [4]:
##############################
# CONVERSATION FUNCTION
##############################

def chat_between_models(message_list, asst_prompt, patient_prompt, starter):
    # Conduct the interview between two models until <STOP> condition is reached
    # Read questions from the file as a single string
    
    questions = Path(questions_path).read_text()  # This should read the content as a string
    sections = parse_questions_with_headers(questions)  # Pass the string directly
    # print("Sections:", sections)  # Print sections for debugging
    chunks = chunk_questions(sections)  # Get the structured chunks of questions
    # print("Chunks:", chunks)  # Print chunks for debugging
    
    max_history_size = 10  # Define the size of message history chunks
    notes = ""  # Initialize notes storage
    max_turns = 50
    current_chunk_idx = 0  # Start at the first chunk
    questions_queue = deque()  # Initialize a queue for questions
    transcript = []

        
    for i in chunks:
        print(i)
        questions_queue.append(i)

    stop_condition = False
    turns = 0
    prev_chunk_length = 0
    total_chunk_lengths = 0
    start = time.time()
    
    first_chunk_complete = 0
    
    first_chunk = questions_queue.popleft()
    curr_chunk = first_chunk
    questions_list = curr_chunk
    formatted_questions_list = '\n'.join(first_chunk)

    while not stop_condition and turns < max_turns:
        time.sleep(1)
        
        
        curr_chunk_length = len(curr_chunk)
        print("Question amount: ", curr_chunk_length)
        
        print("if turns == ", curr_chunk_length, " + ", total_chunk_lengths)
        if turns == curr_chunk_length + total_chunk_lengths:
            
            print("Next chunk: ", questions_queue[0])
            next_question_chunk = questions_queue.popleft()
            
            if first_chunk_complete:
                print()
                print("Dumping first questions.........................................................")
                i=0
                while i < prev_chunk_length:
                    print("Dumping ", prev_chunk[i])
                    questions_list.pop(0)
                    print(questions_list)
                    i += 1
                    
                print("Saving and dumping previous messages..............................................")
                i=0
                while i < prev_chunk_length*2:
                    print("Saving ", message_list[i])
                    transcript.append(message_list[0])
                    message_list.pop(0)
                    for message in transcript:
                        print(message["content"])
                    print("Message_list length: ", len(message_list))
                    i+=1
            
            prev_chunk_length = curr_chunk_length
            total_chunk_lengths = total_chunk_lengths + curr_chunk_length
            prev_chunk = curr_chunk
            curr_chunk = next_question_chunk
            
            
            questions_list = questions_list + curr_chunk
            formatted_questions_list = '\n'.join(questions_list)
            print("Formatted questions: ", '\n', formatted_questions_list)
            print("Updating assistant prompt...")
            
            first_chunk_complete = 1
            
    
            
            
                
            
            
        asst_prompt_with_questions = asst_prompt.replace(r"{questions}", formatted_questions_list)
        # print("Assistant prompt with questions:", '\n', asst_prompt_with_questions)  # Debug print

        # Simulating the patient model's response
        patient_response = "<Patient responds to chunk questions>"
        print(f"Patient response (turn {turns}):\n", patient_response)
        
        # Simulating assistant's response (taking notes on the questions)
        assistant_response = "<Assistant asks questions and adds notes>"
        print(f"Assistant response (turn {turns}):\n", assistant_response)
            
        
            
        # Add both assistant and patient responses to message list
        message_list.append({
             'role': 'user', 'content': patient_response
         })
            
        message_list.append({
             'role': 'assistant', 'content': assistant_response,
         })
            
        # Chunk the message history
        turns += 1
        print("Turn count:", turns)

        if not questions_queue:
            stop_condition = True
        

        

    end = time.time()
    time_taken = end - start
    print("INTERVIEW HAS ENDED")
    print("SYSTEM: ")
    print("Total time taken:", time_taken)
    print("Final Notes:", notes)  # Print or return final notes
    return message_list, notes

In [5]:
def interview_gen(patient: dict, output_id: str, out_dir=out_dir, tracking_path=tracking_path):
    """
    Creates interviews based on the patients that are passed in.
    """
    Path(out_dir).mkdir(parents=True, exist_ok=True)

    ##### Prompt templates
    patient_prompt_str = Path(patient_prompt_path).read_text()
    asst_prompt_str = Path(asst_prompt_path).read_text()
    starter_str = Path(starter_path).read_text()

    ### hydrate starter
    clinician_name = patient["Clinician Name"]
    appt_date = patient["Appointment Date"]
    starter = starter_str.replace(r"{Clinician Name}", clinician_name)
    starter = starter.replace(r"{Date}", appt_date)

    ### hydrate patient prompt
    if "Edge Case Scenario" in patient:  # if there is an edge case
        edge_case = patient["Edge Case Scenario"]
        patient_prompt_str = patient_prompt_str.replace(r"{edge_case_scenario}", f"\n\n{edge_case}")
        ignore_keys = ["Clinician Name", "Appointment Date", 'Conversational Tone', 'Reason for Appointment']
    else:  # if there is no edge case
        edge_case = ""
        patient_prompt_str = patient_prompt_str.replace(r"{edge_case_scenario}", "")
        ignore_keys = ["Clinician Name", "Appointment Date", 'Conversational Tone', "Edge Case Scenario", 'Reason for Appointment']
    
    patient_string = helper.patient_to_str(patient, ignore_keys)
    patient_prompt = patient_prompt_str.replace(r"{patient_info}", patient_string)

    ### begin with starter message
    message_list_starter = [
        {"role": "assistant", "content": starter}
    ]
    message_list = message_list_starter
    
    # Run the chat
    conversation, notes = chat_between_models(message_list, asst_prompt_str, patient_prompt, starter)
    for message in conversation:
        print(message["content"])

    # Use the final notes at the end of the conversation
    print("Final notes summary: ", notes)

In [6]:
import os, sys
from pathlib import Path
from datetime import datetime
import csv
import pandas as pd

print(Path.cwd())

### read the patients csv
patients_path = Path("../patient_creation/patients.csv")
patients_df = pd.read_csv(patients_path,delimiter="|")

folder_name = "llama3.1"
Path.mkdir(Path(Path.cwd(), "transcripts", folder_name), exist_ok=True)

for i, patient in patients_df.iterrows():

    #### use the code below instead if you want to generate a transcript for a certain patient
    if i == 0:
        output_id = datetime.now().strftime("%Y%m%d-%H%M%S")
        patient_dict = patient.dropna().to_dict()
        print(i, patient_dict["Name"])
        interview_gen(patient=patient_dict, output_id=output_id, out_dir=f"./transcripts/{folder_name}/DM") 

C:\Users\aleynaw\Desktop\transcript_generation-main\transcript_generation
0 Oscar Webb
['## General Information ## # Personal Information - full name', '- preferred name', '- date of birth', '- sex', '- handedness: left, right, both']
['## General Information ## # Residence and Marital Status - current city/town of residence', '- single, married, or common law?', '- any children or dependents?', '+ if yes, names and ages?']
['## General Information ## # Employment and Financial Status - currently working?', '+ if yes, name of company? years working?', '- disability assistance status', '+ if yes, what type and when?']
['## General Information ## # Medical Care Information - current doctors?', '- allergies', '- current medications and dosage?', '- health supplements']
['## Medical History ## # Substance Use - frequency of nicotine, marijuana, alcohol use?']
['## Medical History ## # General Health - health conditions/diagnoses and details such as when you were diagnosed?', '- previous ho

KeyError: 'content'