In [1]:
from typing import List, Optional
from pydantic import BaseModel, Field
from langchain_core.utils.function_calling import convert_to_openai_function
from langchain.output_parsers.openai_functions import JsonKeyOutputFunctionsParser
from langchain.output_parsers.openai_functions import JsonOutputFunctionsParser

from langchain.schema import StrOutputParser
from langchain.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough, RunnableParallel

import re
import pandas as pd
import itertools

import gradio as gr
import weasyprint
import io
from pypdf import PdfReader

from langchain_openai import ChatOpenAI
from langchain_core.utils.function_calling import convert_to_openai_function
import os
from dotenv import load_dotenv, find_dotenv

# Load environment variables
_ = load_dotenv(find_dotenv()) 
OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]

# Initialize the LangChain OpenAI model
model = ChatOpenAI(temperature=0, model="gpt-4o", openai_api_key=OPENAI_API_KEY)

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
class CPT(BaseModel):
    """Extracted Information from a Medical Necessity Policy."""
    
    cpt: str = Field(
        description="""The relevant ICD-10 procedure, HCPCS, CPT, or procedure code inside the policy.
                       Treat ICD-10 procedure Code as a procedure code—don't forget to get it!
                       Please do not list ICD-10 diagnosis codes as ICD-10 procedure, HCPCS, CPT, or procedure. 
                       They are different. If multiple, please list them one by one."""
    )
    
    cpt_description: str = Field(
        description="The description of the ICD-10 procedure, CPT, or HCPCS code to the right of the corresponding CPT."
    )
    
    session_idx: str = Field(
        description="""Index of the session. There can be multiple sections of the policy coding section.
                       For example, there can be sections such as 'Medically Necessary', 'Not Medically Necessary', or 'Maybe Medically Necessary'."""
    )
    
    med_necessity_idx: str = Field(
        description="""Give value 1 if the ICD-10 Code is listed in Medically Necessary.
                       0 if the procedure is listed as Not Medically Necessary.
                       -1 if it is 'May be Medically Necessary' or it is not clear or is not related to Medically Necessary or Not Medically Necessary.
                       Medically Necessary categories are mutually exclusive: a code cannot appear in both 'Medically Necessary' and 'May be Medically Necessary'.
                       Please make sure that no overlaps occur."""
    )
    

class CPTInfo(BaseModel):
    """Information to extract"""
    
    cpts: List[CPT]


In [3]:
class Diag(BaseModel):
    """Extracted Information from a Medical Necessity Policy."""
    
    diag: str = Field(
        description="""The relevant ICD-10 diagnosis inside the Policy. Please do not list ICD-10 procedure as ICD-10 diagnosis. 
                       They are different. If there are multiple, please list them one by one."""
    )
    
    diag_desp: str = Field(
        description="The description of the ICD-10 diagnosis Code, to the right of the corresponding CPT."
    )
    
    med_necessity_idx: str = Field(
        description="""Give value 1 if the ICD-10 Code is listed in Medically Necessary.
                       0 if the procedure is listed as Not Medically Necessary.
                       -1 if it is 'May be Medically Necessary' or it is not clear or is not related to Medically Necessary or Not Medically Necessary.
                       Medically Necessary categories are mutually exclusive: a code cannot appear in both 'Medically Necessary' and 'May be Medically Necessary'.
                       Please make sure that no overlaps occur."""
    )
    

class DiagInfo(BaseModel):
    """Information to extract"""
    
    CPTS: List[Diag]


In [4]:
class Session(BaseModel):
    """
    Extracted Information From a Medical Necessity Policy.
    """
    
    Session: str = Field(
        description="""In each policy, there is a section called "Coding".
                       From this Coding section, there may be multiple sessions. Each session may have a leading sentence
                       like the following or close to the following:
                       - "When Services are Medically Necessary" or
                       - "When Services may be Medically Necessary" or
                       - "When Services are Not Medically Necessary"
                       Please extract ALL session titles for each session in the specified format.
                       Please do not skip any session in this specified format."""
    )
    
    session_num: str = Field(
        description="The Index Number of the sessions, starting from 1 and increasing by 1 for each new session."
    )
    
    subsession_procedure: str = Field(
        description="""Please Extract procedure codes such as ICD-10, CPT, or HCPCS procedure for each session.
                       Do not include descriptions for these codes, which are typically listed right after them.
                       Keep these codes as is and do not interpret them.
                       If the document mentions 'procedures listed above', spell them out as needed."""
    )
    
    subsession_diagnosis: str = Field(
        description="""Please extract ICD-10 diagnosis for each session.
                       Do not include descriptions for these codes, which are typically listed right after them.
                       Keep these codes as is and do not interpret them.
                       If the document mentions 'diagnosis listed above', spell them out as needed."""
    )
    
    med_necessity_idx: str = Field(
        description="""Give value 1 if the ICD-10 Code is listed in Medically Necessary.
                       0 if the procedure is listed as Not Medically Necessary.
                       -1 if it is 'May be Medically Necessary' or it is not clear or is not related to Medically Necessary or Not Medically Necessary.
                       Medically Necessary categories are mutually exclusive: a code cannot appear in both 'Medically Necessary' and 'May be Medically Necessary'.
                       Please make sure that no overlaps occur."""
    )
    

class SessionInfo(BaseModel):
    """
    Information to extract.
    """
    
    Sessions: List[Session]

In [5]:
# Define the prompt template
prompt_template_session = [
    (
        "system",
        """You are a helpful assistant.
        You will be presented with a Medical Necessity policy from a certain insurer. 
        The Medical Necessity policy specifies conditions or rules on which insurers think 
        certain procedures are necessary and will be paid for when the conditions are met.

        These rules are usually divided into multiple sections. Each section typically begins with one of the following:
        1. "When services may be medically necessary when criteria are met:"
        2. "When services may also be medically necessary when criteria are met:"
        3. "When services are Cosmetic and Not Medically Necessary:"
        4. "When services may be Reconstructive when criteria are met:"
        """
    ),
    (
        "human",
        """
        For each given policy document: {policy}, locate the "Coding" section. 
        From the coding section, extract multiple sessions labeled as "Medically Necessary" or "Not Medically Necessary."
        
        Each session should start with one of the bolded sentences, such as:
        - "When Services ... are Medically Necessary"
        - "When Services ... are Not Medically Necessary"

        Each session typically consists of two sub-sections:
        - **Procedure Codes**: These are identified by keywords like "CPT," "HCPCS," or "ICD-10 Procedure"
        - **Diagnosis Codes**: These are identified by the keyword "ICD-10 diagnosis."

        Key Guidelines:
        - Avoid duplicating lines like "When Services ... are Medically Necessary" or "When Services ... are Not Medically Necessary" within a single session.
        - If a session does not explicitly list procedure or diagnosis codes but refers to those in a previous session (e.g., "the procedure codes listed above"), please pull the relevant codes into the current session.
        - Ignore sessions containing phrases like "when criteria are not met."
        - If a session appears to fall under both "may be medically necessary" and "medically necessary," classify it as **medically necessary** and index it as `1`.

        When processing multiple sections:
        - Extract **only** the codes (Procedure and Diagnosis) from the relevant sections without any additional labels or formatting.
        """
    ),
]

extraction_prompt_session = ChatPromptTemplate.from_messages(prompt_template_session)


In [6]:
def get_basic_info(first_paragraph: str):
    """Extract basic information from the first paragraph by reformatting and splitting."""
    formatted_text = re.sub(r"Publish Date", r"\nPublish Date", first_paragraph)
    formatted_text = re.sub(r"Last Review Date", r"\nLast Review Date", formatted_text)

    basic_info_lines = formatted_text.split("\n")[1:6]
    
    names = []
    basic_info = []
    
    for line in basic_info_lines:
        if ":" in line:
            name, value = line.split(":", 1)
            names.append(name.strip())
            basic_info.append(value.strip())
    
    basic_info_dict = dict(zip(names, basic_info))
    
    return basic_info_dict

def extract_text_from_pdf(pdf_file):
    policycontent = ''
    for page in pdf_file.pages:
        policycontent += page.extract_text()
    
    firstparagraph = policycontent.split("description")[0]
    policy_basic_info = get_basic_info(firstparagraph)
    
    return policy_basic_info, policycontent


In [7]:
def cross_join(a0, b0):
    """
    Performs a cross-join between two lists or Series and returns a DataFrame.
    
    Parameters:
        a0 (list or pd.Series): List/Series of procedures.
        b0 (list or pd.Series): List/Series of diagnoses.
    
    Returns:
        pd.DataFrame: DataFrame with cross-joined values.
    """
    a0 = str(a0)
    b0 = str(b0)
    a=a0.split("\n")
    b=b0.split("\n")
    tt =[(x,y) for x in a  for y in b]
    df = pd.DataFrame(tt)
    df.columns=['procedure','diagnosis']
    return df

def get_rules(df_session):
    """
    Extracts rules from the given session DataFrame by performing a cross-join
    between procedures and diagnoses, and applying filters to clean the output.
    
    Parameters:
        df_session (pd.DataFrame): Input DataFrame containing session data.
    
    Returns:
        pd.DataFrame: A cleaned DataFrame containing extracted rules.
    """
    # Initialize an empty DataFrame for results
    tresult = pd.DataFrame(columns=['procedure', 'diagnosis', 'session_num', 'med_necessity_idx'])

    # Iterate through the session DataFrame
    for i in range(len(df_session)):
        # Extract procedures and diagnoses for the current row
        a0 = df_session['subsession_procedure'].iloc[i]
        b0 = df_session['subsession_diagnosis'].iloc[i]
        # Perform cross-join
        df = cross_join(a0, b0)

        # Add session-specific columns
        df['session_num'] = df_session['session_num'].iloc[i]
        df['med_necessity_idx'] = df_session['med_necessity_idx'].iloc[i]

        # Append to the result DataFrame
        tresult = pd.concat([tresult, df])

        final_out = tresult
    return final_out

In [8]:
def final_output(finalout):
    """
    Cleans and processes the final output DataFrame to generate all combinations
    of procedure and diagnosis codes for each med_necessity_idx.
    Args:
        finalout (pd.DataFrame): DataFrame with columns ['procedure', 'diagnosis', 'session_num', 'med_necessity_idx'].
    Returns:
        pd.DataFrame: Expanded DataFrame with all combinations of procedure and diagnosis codes.
    """
    # Function to generate combinations for a single row
    def generate_combinations(row):
        # Split procedure and diagnosis into lists
        procedures = row['procedure'].split(',')
        diagnoses = row['diagnosis'].split(',')
        
        # Create all combinations of procedure and diagnosis
        combinations = list(itertools.product(procedures, diagnoses))
        
        # Add additional columns back into the combinations
        expanded_rows = [
            {'procedure': proc, 'diagnosis': diag, 'session_num': row['session_num'], 'med_necessity_idx': row['med_necessity_idx']}
            for proc, diag in combinations
        ]
        
        return expanded_rows

    # Apply the combination generation to the DataFrame
    expanded_rows = []
    for _, row in finalout.iterrows():
        expanded_rows.extend(generate_combinations(row))

    # Create a new DataFrame from the expanded rows
    expanded_df = pd.DataFrame(expanded_rows)

    
    return expanded_df

In [9]:
# URL of the webpage
url = "policy_rule"

# Output PDF file name
output_file = "webpage_output.pdf"

# Convert the webpage to a PDF using WeasyPrint
weasyprint.HTML(url).write_pdf(output_file)

print(f"PDF saved as {output_file}")

# Define the PDF file path
policyfile = 'webpage_output.pdf'

# Initialize PdfReader with the policy file
pdf_read = PdfReader(policyfile)

# Initialize an empty string to hold the content
policycontent = ''

# Extract text from each page in the PDF
for page in pdf_read.pages:
    policycontent += page.extract_text()

# Split the content by the word "description" and get the first part
first_paragraph = policycontent.split("description")[0]

# Output the first paragraph for review
policy_basic_info= get_basic_info(first_paragraph)
print(policy_basic_info)
# print(policycontent)

PDF saved as webpage_output.pdf
{'Subject': 'Venous Angioplasty with or without Stent Placement or Venous Stenting Alone', 'Guideline #': 'CG-SURG-106', 'Publish Date': '04/10/2024', 'Status': 'Reviewed', 'Last Review Date': '02/15/2024'}


In [10]:
# Define the extraction function
extraction_function = [convert_to_openai_function(SessionInfo)]
extraction_model = model.bind(functions=extraction_function, function_call={"name": "SessionInfo"})
extraction_prompt_session = ChatPromptTemplate.from_messages(prompt_template_session)
extraction_chain = extraction_prompt_session | extraction_model | JsonKeyOutputFunctionsParser(key_name="Sessions")

# Invoke the LangChain model for extraction
AllSessions = extraction_chain.invoke(input={"policy": policycontent})

# Convert the response to DataFrame
df_session = pd.DataFrame(data=pd.json_normalize(AllSessions))

# Clean up and process as required
out = get_rules(df_session)
final = final_output(out)
 


In [11]:
out

Unnamed: 0,procedure,diagnosis,session_num,med_necessity_idx
0,"37238, 37239, 37248, 37249, 027Q04Z-027Q4ZZ, 0...","C38.1-C38.3, C38.8, G54.0, I26.01-I26.99, I82....",1,1
0,"37238, 37239, 37248, 37249, 027Q04Z-027Q4ZZ, 0...","I28.8, I87.1",2,-1
0,"05700DZ-05704ZZ, 05710DZ-05714ZZ, 05730D1-0574...",All diagnoses,3,0
0,"61630, 61635, 057L0DZ-057L4ZZ",G93.2,4,-1


In [12]:
# Split the procedure and diagnosis strings into lists and count unique values
out['procedure_count'] = out['procedure'].apply(lambda x: len(set(str(x).split(','))))
out['diagnosis_count'] = out['diagnosis'].apply(lambda x: len(set(str(x).split(','))))

# Group by 'med_necessity_idx' and sum the counts
summary = out.groupby('session_num')[['procedure_count', 'diagnosis_count']].sum()

# Print the summary
print(summary)


             procedure_count  diagnosis_count
session_num                                  
1                         18               14
2                         18                2
3                         23                1
4                          3                1


In [13]:
final

Unnamed: 0,procedure,diagnosis,session_num,med_necessity_idx
0,37238,C38.1-C38.3,1,1
1,37238,C38.8,1,1
2,37238,G54.0,1,1
3,37238,I26.01-I26.99,1,1
4,37238,I82.0,1,1
...,...,...,...,...
309,067T0DZ-067V4ZZ,All diagnoses,3,0
310,067Y0DZ-067Y4ZZ,All diagnoses,3,0
311,61630,G93.2,4,-1
312,61635,G93.2,4,-1


In [14]:
# Define the extraction function
extraction_function = [convert_to_openai_function(SessionInfo)]
extraction_model = model.bind(functions=extraction_function, function_call={"name": "SessionInfo"})
extraction_chain = extraction_prompt_session | extraction_model | JsonKeyOutputFunctionsParser(key_name="Sessions")

# Function to extract text from a PDF file
def extract_text_from_pdf(pdf_file):
    """Extract text content from a PDF file."""
    pdf_reader = PdfReader(pdf_file)
    pdf_text = "".join(page.extract_text() for page in pdf_reader.pages)
    return pdf_text

# Define the final output processing function
def process_policy(pdf_file):
    """Extract the basic info and process the policy content with LangChain model."""
    # Extract the policy content from the uploaded PDF
    policycontent = extract_text_from_pdf(pdf_file)

    # Invoke the LangChain model for extraction
    AllSessions = extraction_chain.invoke(input={"policy": policycontent})

    # Convert the response to DataFrame
    df_session = pd.DataFrame(data=pd.json_normalize(AllSessions))

    # Get the number of sessions
    num_sessions = df_session['session_num'].iloc[-1] if not df_session.empty else 0

    # Clean up and process the session DataFrame to extract the rules
    finalout = get_rules(df_session)

    # Generate final output (table or other representation)
    final_output_df = final_output(finalout)

    # Get the number of rules (number of rows in the table)
    num_rules = len(final_output_df)

    # Return the counts along with the table to Gradio
    return f"Number of sessions caught: {num_sessions}", f"Number of rules: {num_rules}", final_output_df

# Set up Gradio interface
iface = gr.Interface(
    fn=process_policy, 
    inputs=gr.File(label="Upload Policy PDF"), 
    outputs=[gr.Textbox(label="Sessions Information"), gr.Textbox(label="Rules Information"), gr.DataFrame(label="Processed Rules Table")]
)

iface.launch(share=True)

* Running on local URL:  http://127.0.0.1:7862
* Running on public URL: https://371e47018219c5bac9.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


