In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# 1. Defining the Schema (Column Names and Types)
schema = StructType([
    StructField("Patient_ID", IntegerType(), True),
    StructField("Discharge_Summary", StringType(), True)
])

# 2. Defining the Data
data = [
    (101, "Patient discharged home. No concerns regarding immediate safety. Has stable housing and reliable transportation. Follow-up scheduled for 7 days."),
    (102, "Patient expressed difficulty affording medications due to recent job loss. History of tobacco use documented. Needs referral to social worker for housing assistance."),
    (103, "Discharged with complex medication regimen. Patient lives alone and reported occasional heavy alcohol use. Recommend community resources for social support."),
    (104, "Routine discharge. Patient is a non-smoker with good family support structure. Education provided on diet and exercise."),
    (105, "Readmitted within 48 hours. Discharge note indicated patient was homeless and unable to contact for follow-up. Substance abuse history noted.")
]

# 3. Creating the Spark DataFrame
df_spark_raw = spark.createDataFrame(data=data, schema=schema)

print("Schema of the created data:")
df_spark_raw.printSchema()
df_spark_raw.show(5, truncate=False)

In [0]:
# Defining keyword dictioning 

risk_keywords = {
    'Housing_Risk': ['homeless', 'housing assistance', 'lives alone', 'unstable housing'],
    'Employment_Financial_Risk': ['job loss', 'unemployment', 'unable to afford medications', 'difficulty affording', 'financial strain', 'cost issues'], 
    'Substance_Use_Risk': ['alcohol use', 'substance abuse', 'tobacco use', 'heavy smoking']
}

In [0]:


from pyspark.sql.functions import col, upper, regexp_replace, when, lit

# Checking the text and creating new columns

# 1. Clean and Uppercase the Summary Column (Case-Insensitive Matching)
# Using 'withColumn' method to create a new column
df_spark_processed = df_spark_raw.withColumn(
    "Summary_Upper", 
    upper(col("Discharge_Summary"))
)

# 2. Iterating and Creating Binary Flags (1 or 0)
for risk_category, keywords in risk_keywords.items():
    
    # A. Creating the Pipe-Separated Regex Pattern (e.g., 'HOMELESS|JOB LOSS...')
    # Using 'join' to combine the list of keywords into a single string, separated by '|'.
    pattern = '|'.join([k.upper() for k in keywords])

    # B. Creating the new binary column using 'when' and 'rlike' (PySpark's Regex search)
    # Using 'when' for conditional logic (IF...THEN...ELSE).
    df_spark_processed = df_spark_processed.withColumn(
        risk_category,
        when(
            col("Summary_Upper").rlike(pattern),  # IF the text matches the pattern (rlike is regex-like)
            lit(1)                               # THEN set the value to 1
        ).otherwise(
            lit(0)                               # ELSE set the value to 0
        )
    )

# 3. Select Final Structured Data and Print
df_structured = df_spark_processed.select(
    "Patient_ID", 
    "Discharge_Summary", 
    "Housing_Risk", 
    "Employment_Financial_Risk", 
    "Substance_Use_Risk"
)

print("--- Extracted Structured Risk Data (PySpark) ---")
df_structured.show(5, truncate=False)

In [0]:
from pyspark.sql.functions import sum as spark_sum, col

# 1. Getting total number of patients (total rows in the DataFrame)
total_patients = df_structured.count()

# 2. Performing the aggregation: Summing the 1s for each risk column (spark_sum)
risk_counts_df = df_structured.agg(
    spark_sum(col("Housing_Risk")).alias("Housing_Count"),
    spark_sum(col("Employment_Financial_Risk")).alias("Employment_Financial_Count"),
    spark_sum(col("Substance_Use_Risk")).alias("Substance_Use_Count")
)

print(f"Total Patients Analyzed: {total_patients}")
print("Raw Counts of Risk Factors:")
risk_counts_df.show()

In [0]:
# 1. Pull the single row of counts back to the driver for simpler calculation
risk_counts_row = risk_counts_df.collect()[0]

# 2. Extract counts and calculate rates (Count / Total * 100)
housing_rate = (risk_counts_row["Housing_Count"] / total_patients) * 100
employment_rate = (risk_counts_row["Employment_Financial_Count"] / total_patients) * 100
substance_rate = (risk_counts_row["Substance_Use_Count"] / total_patients) * 100

print(f"\n--- Final SDOH Risk Prevalence Rates ---")
print("-" * 40)
print(f"1. Housing Risk Prevalence: {housing_rate:.2f}%")
print(f"2. Employment/Financial Risk Prevalence: {employment_rate:.2f}%")
print(f"3. Substance Use Risk Prevalence: {substance_rate:.2f}%")

In [0]:
import pandas as pd

# Structure the results for visualization
summary_data = [
    ("Housing Risk", housing_rate),
    ("Employment/Financial Risk", employment_rate),
    ("Substance Use Risk", substance_rate)
]

# Create a final Pandas DataFrame for presentation
summary_df_pandas = pd.DataFrame(summary_data, columns=['Risk_Category', 'Prevalence_Rate'])

# Use the Databricks-specific display command to show the results table and automatically chart it!
display(summary_df_pandas)

In [0]:
import plotly.express as px

# Using Pandas DataFrame created in the previous step
fig = px.bar(
    summary_df_pandas, 
    x='Risk_Category', 
    y='Prevalence_Rate',
    title='SDOH Risk Prevalence Rates',
    color='Prevalence_Rate', 
    color_continuous_scale=px.colors.sequential.Viridis,
    text=summary_df_pandas['Prevalence_Rate'].apply(lambda x: f'{x:.1f}%') 
)

# Customizing the layout for clarity
fig.update_traces(textposition='outside')
fig.update_layout(xaxis_title='Risk Factor',
                  yaxis_title='Prevalence Rate (%)',
                  title_x=0.5)

# Using Databricks display() function on the Plotly figure object
display(fig)