# Silver Layer -  NLP, GPT Processing of Raw Data

In [1]:
from datetime import datetime
import re

import nltk
import nltk.data
from nltk.tokenize import word_tokenize
from nltk.tag import pos_tag
from nltk.corpus import stopwords
from nltk.chunk import RegexpParser

from pyspark.sql.functions import col, expr, to_date
from pyspark.sql.functions import regexp_extract

import ast

import openai
from openai import OpenAI

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType


nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')
df = spark.sql("SELECT * FROM PT_evals_lakehouse.bronze_layer_nlp LIMIT 1000")

# Only act upon those rows which are new for the week
df = df.withColumn('eval_date', to_date(col('eval_date'), 'yyyy-MM-dd'))
df = df.filter(expr(f"eval_date >= TO_DATE('{last_added_date}', 'yyyy-MM-dd')"))

StatementMeta(, 76dab440-8afc-4d2e-94a2-168afc801ed6, 5, Finished, Available, Finished)

[nltk_data] Downloading package punkt to /home/trusted-service-
[nltk_data]     user/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /home/trusted-service-user/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger.zip.


In [2]:
# Regular expressions to clean and collect age, sex, fusion, brace, pain data

age_pattern = '\d+'
sex_pattern = '\S*male'
fusion_pattern = '(cervical|lumbar)'
brace_pattern = '\D*lso|aspen'
pain_pattern = '\d+'

df = (df
      .withColumn('age', regexp_extract('age_sex', age_pattern, 0))
      .withColumn('sex', regexp_extract('age_sex', sex_pattern, 0))
      .withColumn('fusion', regexp_extract('fusion', fusion_pattern, 0))
      .withColumn('brace', regexp_extract('brace', brace_pattern, 0))
      .withColumn('pain', regexp_extract('pain', pain_pattern, 0))
)

StatementMeta(, 76dab440-8afc-4d2e-94a2-168afc801ed6, 6, Finished, Available, Finished)

In [3]:
# Function to determine names and extract only the last name
def extract_last_name(name):
    if not name:
        return None

    tagged_tokens = pos_tag(word_tokenize(name))
    grammar = "NAME: {<NNP>+}"
    chunk_parser = RegexpParser(grammar)
    chunked = chunk_parser.parse(tagged_tokens)

    titles = {'Md', 'Np', 'Do', 'Pa'}
    last_name = None

    for subtree in chunked:
        if isinstance(subtree, nltk.Tree) and subtree.label() == 'NAME':
            name_tokens = [token for token, pos in subtree.leaves() if token not in titles]
            if name_tokens:
                last_name = name_tokens[-1]

    return last_name

# Necessary function for pos_tags to identify names
def to_title(text):
    return text.title()

# Register both expressions for use
spark.udf.register("to_title", to_title)
spark.udf.register('extract_last_name', extract_last_name)

df = (df.withColumn('ordering_provider', expr('to_title(ordering_provider)')).withColumn('ordering_provider', expr('extract_last_name(ordering_provider)')))
df.persist()

StatementMeta(, 76dab440-8afc-4d2e-94a2-168afc801ed6, 7, Finished, Available, Finished)

DataFrame[eval_date: date, age_sex: string, fusion: string, ordering_provider: string, brace: string, pain: string, plof: string, mobility: string, age: string, sex: string]

In [4]:
client = OpenAI(api_key = 'sk-proj-vCT4yV5uyXXTjEQQMmSZT3BlbkFJg9gjbWTRzgrswM1mTsOL')

# List for OpenAI to iterate through
plof = df.select("plof").rdd.flatMap(lambda x: x).collect()

# Lists to dump data
prior_loc = []
ste = []
hr = []
ad = []
num_falls = []

list_pattern = '(\[.*\])'

for eval_plof in plof:
    plof_prompt = f"""
    You are an automated data extractor. Your only goal is to analyze the following paragraph for the data listed and return it in the format
    required below. The paragraph describes the living situation and prior level of function of 
    a patient who is admitted in the hospital.
    
    Extract the following information and return it in the format listed below the paragraph.
    1. Type of location that the patient was living in. If they plan to live in another location, such as an individual who lives in a two story home
    but plans to stay in a ranch home, then encode the ranch home. We will encode this as follows:
        Ranch home or single story home (ssh): 0
        Two or more story home (2sh): 1
        Apartment: 2
        Independent living facility (ILF): 3
        Assisted living facility (ALF): 4
        Skilled nursing facility (SNF): 5
        Inpatient rehabilitation hospital: 6
        Long-term care facility (LTC): 7
    2. Number of stairs (steps). If the paragraph mentions "1+1 ste" or "1+1 stairs to enter", reduce this to 1. Otherwise, 
    perform the necessary addition.
    3. Number of handrails present. If the paragraph makes no mention of handrails, sometimes abbreviated as "hr" or "hrs", then this is 0.
    4. Type of assistive device used. If this is not mentioned, then assume no assistive device. We will encode the assistive device used, if any:
        No assistive device used: 0
        Cane: 1
        Walker: 2
    5. Number of falls in the last six months.
    
    
    Paragraph:
    \"\"\"
    {eval_plof}
    \"\"\"
    
    Response format will be in the form of a python list with only the integers returned. For example: "[ 1, 3, 2, 0, 3]" where:
        Number of stories in the house is at index 1.
        Number stairs (steps) is at index 2.
        Number of handrails is at index 3.
        Assistive device used is at index 4.
        Number of falls in the last 6 months is at index 5.

    Do not return an explanation. Only return the python list.
    """
    
    response = client.chat.completions.create(
        model = "gpt-4o",
        messages = [
            {"role":"user","content":plof_prompt}
        ]
    )
    # Gather responses
    gpt_plof_response = response.choices[0].message.content

    # Collect and convert responses into a list of lists
    plof_list = ast.literal_eval(re.search(list_pattern,gpt_plof_response).group())

    # Dump data into respective lists
    prior_loc.append(plof_list[0])
    ste.append(plof_list[1])
    hr.append(plof_list[2])
    ad.append(plof_list[3])
    num_falls.append(plof_list[4])


StatementMeta(, 76dab440-8afc-4d2e-94a2-168afc801ed6, 8, Finished, Available, Finished)

In [5]:
list_pattern = '(\[.*\])'

# List for OpenAI to iterate through
mobility = df.select("mobility").rdd.flatMap(lambda x: x).collect()

# Lists to dump datas
sup_sit = []
sit_stand = []
amb_assist = []
amb_distance = []
stairs_assist = []
num_stairs = []

for eval_mob in mobility:
    mobility_prompt = f"""
    
    You are an automated data extractor. Your only goal is to analyze the following paragraph for the data listed and return it in the format
    required below. The paragraph describes the mobility assessment performed on a patient following a surgical fusion 
    that is in an inpatient hospital. 
    
    The purpose of this analysis is to retrieve information regarding the assistance required to perform various mobility, 
    distance of ambulation, and number of stairs completed during the session. Some portions of information may not be available as they 
    might not have been performed.
    
    Assistance ranges from:
    SBA: Stand-by-assistance
    Min-A: Minimal assistance
    Mod-A: Moderate assistance
    Max-A: Maximal Assistance
    'x#' where # is the number of individuals assisting. For example, minAx1 means minimal assistance of 1.
    
    Distance is measured in feet.
    
    Extract the following information and return it in the format listed below the paragraph. 
    1. Supine to sit (also written as sup < > sit or sup to sit) assistance required
    2. Sit to stand (also written as sit < > stand) assistance required
    3. Ambulation assistance required
    4. Ambulation distance (If Ambulation assistance required is 4, then this value is 0.)
    5. Stairs assistance required
    6. Stairs completed (If stairs assistance is 4, then this value is 0.)
    
    Paragraph:
    \"\"\"
    {eval_mob}
    \"\"\"
    
    Return the assistance variables in the encoded format:
    Stand-by-assistance: 0
    Minimal assistance of 1: 1
    Moderate assistance of 1: 2
    Maximal assistance of 1 or more, moderate assistance of 2: 3
    Unable to complete, did not complete, or not feasible to complete: 4
    
    Response format will only be in the form of a python list with only the integers returned. For example: "[ 1, 3, 2, 100, 3, 6]"
    where:
        Supine to sit assistance is index 0.
        Sit to stand assistance is index 1.
        Ambulation assistance is index 2.
        Ambulation distance is index 3.
        Stairs assistance is index 4.
        Stairs completed is index 5.

    Do not return an explanation. Only return the python list.
    """
    
    response = client.chat.completions.create(
        model = "gpt-4o",
        messages = [
            {"role":"user","content":mobility_prompt}
        ]
    )

    # Gather responses
    gpt_mob_response = response.choices[0].message.content

    # Collect and convert responses into a list of lists
    mobility_list = ast.literal_eval(re.search(list_pattern,gpt_mob_response).group())
    
    # Dump data into respective lists
    sup_sit.append(mobility_list[0])
    sit_stand.append(mobility_list[1])
    amb_assist.append(mobility_list[2])
    amb_distance.append(mobility_list[3])
    stairs_assist.append(mobility_list[4])
    num_stairs.append(mobility_list[5])


StatementMeta(, 76dab440-8afc-4d2e-94a2-168afc801ed6, 9, Finished, Available, Finished)

In [6]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number

# New DataFrame columns to join to original
data = list(zip(prior_loc, ste, hr, ad, num_falls, sup_sit, sit_stand, amb_assist, amb_distance, stairs_assist, num_stairs))
columns = ['prior_loc', 'ste', 'hr', 'ad', 'num_falls', 'sup_sit', 'sit_stand', 'amb_assist', 'amb_distance', 'stairs_assist', 'num_stairs']
new_df = spark.createDataFrame(data, columns)

# Indexes to join DataFrames upon
window = Window.orderBy("ordering_provider")  
df = df.withColumn("index", row_number().over(window))
new_df = new_df.withColumn("index", row_number().over(Window.orderBy("prior_loc")))

# Join DataFrames
df = df.join(new_df, on="index")

StatementMeta(, 76dab440-8afc-4d2e-94a2-168afc801ed6, 10, Finished, Available, Finished)

In [7]:
df = df.drop('age_sex').drop('mobility').drop('plof').drop("index")

df = df.withColumn("eval_date", col("eval_date").cast(DateType())) \
       .withColumn("fusion", col("fusion").cast(StringType())) \
       .withColumn("ordering_provider", col("ordering_provider").cast(StringType())) \
       .withColumn("brace", col("brace").cast(StringType())) \
       .withColumn("pain", col("pain").cast(IntegerType())) \
       .withColumn("age", col("age").cast(IntegerType())) \
       .withColumn("sex", col("sex").cast(StringType())) \
       .withColumn("prior_loc", col("prior_loc").cast(IntegerType())) \
       .withColumn("ste", col("ste").cast(IntegerType())) \
       .withColumn("hr", col("hr").cast(IntegerType())) \
       .withColumn("ad", col("ad").cast(IntegerType())) \
       .withColumn("num_falls", col("num_falls").cast(IntegerType())) \
       .withColumn("sup_sit", col("sup_sit").cast(IntegerType())) \
       .withColumn("sit_stand", col("sit_stand").cast(IntegerType())) \
       .withColumn("amb_assist", col("amb_assist").cast(IntegerType())) \
       .withColumn("amb_distance", col("amb_distance").cast(IntegerType())) \
       .withColumn("stairs_assist", col("stairs_assist").cast(IntegerType())) \
       .withColumn("num_stairs", col("num_stairs").cast(IntegerType()))

# Configure schema
new_schema = StructType([
    StructField("eval_date", DateType(), True),
    StructField("fusion", StringType(), True),
    StructField("ordering_provider", StringType(), True),
    StructField("brace", StringType(), True),
    StructField("pain", IntegerType(), True),
    StructField("age", IntegerType(), True),
    StructField("sex", StringType(), True),
    StructField("prior_loc", IntegerType(), True),
    StructField("ste", IntegerType(), True),
    StructField("hr", IntegerType(), True),
    StructField("ad", IntegerType(), True),
    StructField("num_falls", IntegerType(), True),
    StructField("sup_sit", IntegerType(), True),
    StructField("sit_stand", IntegerType(), True),
    StructField("amb_assist", IntegerType(), True),
    StructField("amb_distance", IntegerType(), True),
    StructField("stairs_assist", IntegerType(), True),
    StructField("num_stairs", IntegerType(), True)
])

df = spark.createDataFrame(df.rdd, schema = new_schema)

df.write.mode('append').saveAsTable('silver_layer_nlp')

StatementMeta(, 76dab440-8afc-4d2e-94a2-168afc801ed6, 11, Finished, Available, Finished)