# 01: Load and Clean Data
## Overview
This notebook ingests the raw Medical FAQ dataset (`medquad.csv`), performs foundational cleaning (e.g., text transformation), and persists the data in Parquet format for efficient downstream processing in the Healthcare FAQ Generator pipeline. It uses PySpark on Databricks for scalable data handling and mounts Azure Blob Storage for secure access.

## **Purpose**  
- Load raw CSV data from Azure Blob Storage.
- Apply basic transformations (lowercase, remove special characters).
- Save as Parquet for fast, columnar storage and analytics.

### **Why this step matters**  
* Establishes a reliable, repeatable starting point for the pipeline.  
* Converts slow CSV reads into fast, analytics-friendly Parquet for both Spark and Python.  
* Catches early data issues before they propagate into training and evaluation.

### **Business value**  
* Faster experimentation cycles for model training and retrieval indexing.  
* Lower storage and compute costs due to columnar compression and predicate pushdown.  
* Traceable data lineage from raw to curated, supporting auditability and handoffs.

### **Technical Approach**  
- **Input**: `medquad.csv` (16,412 rows) from Azure Blob.
- **Output**: `transformed_medquad.parquet` (16,359 rows after cleaning).
- **Tools**: PySpark for distributed processing, Azure Blob mounting with SAS token.
- **Runtime**: ~1-2 minutes on Databricks Community Edition (CPU).

In [1]:
import os
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import col, count, when, lower, regexp_replace
from dotenv import load_dotenv

---
## Configure environment and access

### **Purpose**  
Load configuration from `.env` and mount Azure Blob Storage for secure data access, ensuring least-privilege access without hardcoding secrets.

### Technical Details
- Uses `dotenv` to load SAS token, account, container, and mount point.
- Mounts `wasbs://container@account.blob.core.windows.net` to Blob container.
- Checks existing mounts to avoid remounting unnecessarily.
- SAS token provides read/write access with expiration.

### **Acceptance checks**  
*  `.env` or secret scope is available  
* Required keys present: storage account, container, mount path, SAS key  
* Spark session (if used) is initialized

### **Risks and mitigation**  
* Missing credentials → clear error with instructions  
* Wrong container or path → fail fast with a message


In [None]:
load_dotenv()
# Mount Azure Blob container to DBFS using SAS token

SAS_TOKEN = os.getenv("SAS_TOKEN") 
ACCOUNT   = os.getenv("STORAGE_ACC_NAME")
CONTAINER = os.getenv("CONT_NAME")
MOUNT_PT  = os.getenv("MOUNT_PT")

SOURCE    = f"wasbs://{CONTAINER}@{ACCOUNT}.blob.core.windows.net"
CFGKEY    = f"fs.azure.sas.{CONTAINER}.{ACCOUNT}.blob.core.windows.net"


# current mounts as {mountPoint: SOURCE}
mounted = {m.mountPoint: m.source for m in dbutils.fs.mounts()}

try:
    if MOUNT_PT in mounted:
        # already mounted
        if mounted[MOUNT_PT] != SOURCE:
            # mounted to something else -> fix it
            dbutils.fs.unmount(MOUNT_PT)
            dbutils.fs.mount(source=SOURCE, mount_point=MOUNT_PT, extra_configs={CFGKEY: SAS_TOKEN})
    else:
        # not mounted yet
        dbutils.fs.mount(source=SOURCE, mount_point=MOUNT_PT, extra_configs={CFGKEY: SAS_TOKEN})
except Exception:
    # keep it quiet but re-raise so failures surface when needed
    raise


---
## Load Raw Dataset

### Purpose
Read the raw `medquad.csv` file from Azure Blob using PySpark, preserving original schema and handling multiline/quoted fields.

### Insights:
- Ingests 16,412 raw medical FAQ records efficiently, forming the foundation for FAQ generation.
- Handles large datasets scalably, supporting healthcare data pipelines.

### Technical Details
- Options: `header=true`, `multiLine=true`, `escape="\""`, `quote="\""` for CSV parsing.
- Path: `CONTAINER/medquad.csv` (mounted Azure Blob).
- Output: Spark DataFrame with columns `question`, `answer`, `source`, `focus_area`.
- Row count: 16,412 (verified with `df.count()`).

In [12]:
# Intitialize SparkSession
spark = SparkSession.builder.appName("LoadMedQuAD").getOrCreate()
data_path = f"/dbfs{os.getenv('MOUNT_PT')}/medquad.csv"

df = spark.read.option("header", "true") \
                        .option("multiLine", "true") \
                        .option("escape", "\"") \
                        .option("quote", "\"") \
                        .csv(data_path)
print(f"Reloaded row count: {df.count()}")
print("First 5 rows:")
display(df.limit(5))
df.show(5, truncate=60)

Reloaded row count: 16412
First 5 rows:


DataFrame[question: string, answer: string, source: string, focus_area: string]

+--------------------------------------+------------------------------------------------------------+---------------+----------+
|                              question|                                                      answer|         source|focus_area|
+--------------------------------------+------------------------------------------------------------+---------------+----------+
|              What is (are) Glaucoma ?|Glaucoma is a group of diseases that can damage the eye's...|NIHSeniorHealth|  Glaucoma|
|                What causes Glaucoma ?|Nearly 2.7 million people have glaucoma, a leading cause ...|NIHSeniorHealth|  Glaucoma|
|   What are the symptoms of Glaucoma ?|Symptoms of Glaucoma  Glaucoma can develop in one or both...|NIHSeniorHealth|  Glaucoma|
|What are the treatments for Glaucoma ?|Although open-angle glaucoma cannot be cured, it can usua...|NIHSeniorHealth|  Glaucoma|
|              What is (are) Glaucoma ?|Glaucoma is a group of diseases that can damage the eye's

In [None]:
# Display the schema to understand column names and types
df.printSchema()

root
 |-- question: string (nullable = true)
 |-- answer: string (nullable = true)
 |-- source: string (nullable = true)
 |-- focus_area: string (nullable = true)



---
## Handling Duplicates, missing values and text transformation

### **Purpose**  
Apply basic cleaning to the raw dataset (lowercase questions, remove special characters) to prepare for downstream processing like lemmatization and model training.

### **What to check**  
* Null distribution per column  
* Sample records for visual inspection  
* Duplicate headline counts by key fields

### **Why?**  
- Standardizes text data, improving model performance and FAQ relevance in telehealth applications.
- Reduces noise (e.g., punctuation), enabling better tokenization and similarity scoring.

### **Technical Details**  
- Uses `lower` and `regexp_replace` to clean `question` column (alphanumeric and spaces only).
- Retains `answer`, `source`, `focus_area` unchanged.
- Output: `transformed_df` with 16,359 rows (after any null filtering if applied).
- Schema: `transform_question` (string), `answer` (string), `source` (string), `focus_area` (string).


In [7]:
# Remove duplicate rows based on question and answer
df_no_dup = df.dropDuplicates(["question", "answer", "source", "focus_area"])
print(f"Removed duplicates. New row count: {df_no_dup.count()}")

Removed duplicates. New row count: 16364


In [8]:
row_count_bef = df_no_dup.count()
print(f"-- After removing duplicates: {row_count_bef}")
null_count_bef = df_no_dup.filter(col("answer").isNull()).count()
print(f"Rows with null 'answers' after removing duplicates: {null_count_bef}\n")

# Removing NULL 'answers'
df_cleaned = df_no_dup.filter(df_no_dup.answer.isNotNull())
print("Cleaning NULL values in 'answers'...\n")

null_count = df_cleaned.filter(col("answer").isNull()).count()
print(f"Rows with null 'answers' after cleaning: {null_count}")
row_count = df_cleaned.count()
print(f"--Remaining rows in the dataset: {row_count}")

-- After removing duplicates: 16364
Rows with null 'answers' after removing duplicates: 5

Cleaning NULL values in 'answers'...

Rows with null 'answers' after cleaning: 0
--Remaining rows in the dataset: 16359


In [13]:
# display(df_cleaned.limit(5))
df_cleaned.show(5, truncate=35)

+-----------------------------------+-----------------------------------+-----------------+-----------------------------------+
|                           question|                             answer|           source|                         focus_area|
+-----------------------------------+-----------------------------------+-----------------+-----------------------------------+
|What are the symptoms of Periphe...|People who have P.A.D. may have ...|  NIHSeniorHealth|Peripheral Arterial Disease (P.A...|
|Who is at risk for Adult Acute M...|Smoking, previous chemotherapy t...|        CancerGov|       Adult Acute Myeloid Leukemia|
|What are the treatments for Myel...|Because myelodysplastic /myelopr...|        CancerGov|Myelodysplastic/ Myeloproliferat...|
|what research (or clinical trial...|New types of treatment are being...|        CancerGov|                        Skin Cancer|
|What is (are) Childbirth Problems ?|While childbirth usually goes we...|MPlusHealthTopics|             

In [14]:
# Transform text: lowercase and remove special characters

transformed_df = df_cleaned.select(
    lower(regexp_replace(col("question"), "[^a-zA-Z0-9\\s]", "")).alias("transform_question"),
    # Retaining 'answer','source' and 'focus_area' columns
    col("answer"),
    col("source"),
    col("focus_area")
)

# display(transformed_df.limit(5))
transformed_df.show(5, truncate=35)


+-----------------------------------+-----------------------------------+-----------------+-----------------------------------+
|                 transform_question|                             answer|           source|                         focus_area|
+-----------------------------------+-----------------------------------+-----------------+-----------------------------------+
|what are the symptoms of periphe...|People who have P.A.D. may have ...|  NIHSeniorHealth|Peripheral Arterial Disease (P.A...|
|who is at risk for adult acute m...|Smoking, previous chemotherapy t...|        CancerGov|       Adult Acute Myeloid Leukemia|
|what are the treatments for myel...|Because myelodysplastic /myelopr...|        CancerGov|Myelodysplastic/ Myeloproliferat...|
|what research or clinical trials...|New types of treatment are being...|        CancerGov|                        Skin Cancer|
|   what is are childbirth problems |While childbirth usually goes we...|MPlusHealthTopics|             

---
## Helper Functions for Persistence

### Purpose
Defines utility functions to ensure reliable Parquet saving, including mount checks and overwrite logic, for reproducible data pipelines.

- Ensures data persistence is idempotent and fault-tolerant, supporting iterative development in healthcare data workflows.
- Optimizes storage with Parquet compression and coalescing for faster reads.

### Technical Details
- `ensure_mount`: Mounts Azure Blob if not already done.
- `path_exists`/`is_valid_parquet`: Checks file validity.
- `ensure_parquet`: Writes Parquet if needed (new, recovered, skipped, or overwritten based on row count).
- Output: `transformed_medquad.parquet` in Blob container.

In [None]:
# --- helpers
def ensure_mount(mount_pt=MOUNT_PT, source=SOURCE, cfgkey=CFGKEY, sas=SAS_TOKEN):
    """Mount once. If already mounted to the right source, do nothing."""
    mounted = {m.mountPoint: m.source for m in dbutils.fs.mounts()}
    if mount_pt not in mounted:
        dbutils.fs.mount(source=source, mount_point=mount_pt, extra_configs={cfgkey: sas})
        return "mounted"
    if mounted[mount_pt] != source:
        dbutils.fs.unmount(mount_pt)
        dbutils.fs.mount(source=source, mount_point=mount_pt, extra_configs={cfgkey: sas})
        return "remounted"
    return "ok"

def path_exists(path):
    try:
        dbutils.fs.ls(path)
        return True
    except Exception:
        return False

def is_valid_parquet(path):
    try:
        _ = spark.read.parquet(path).limit(1).count()
        return True
    except Exception:
        return False

def ensure_parquet(df, out_path, coalesce_one=True):
    """
    Write df to out_path if needed:
      - If out_path missing -> write
      - If present but unreadable -> clean + write
      - If present and row count matches -> skip
      - Else overwrite
    """
    if not path_exists(out_path):
        (df.coalesce(1) if coalesce_one else df).write.mode("overwrite").parquet(out_path)
        return "written (new)"

    if not is_valid_parquet(out_path):
        dbutils.fs.rm(out_path, recurse=True)
        (df.coalesce(1) if coalesce_one else df).write.mode("overwrite").parquet(out_path)
        return "written (recovered)"

    cur = spark.read.parquet(out_path)
    if df.count() == cur.count():
        return "skipped (same row count)"
    # overwrite if different size; requires 'd' in SAS
    (df.coalesce(1) if coalesce_one else df).write.mode("overwrite").parquet(out_path)
    return "written (overwrote)"


In [None]:
# --- use them with transformed df
status = ensure_mount()
print("mount:", status)

OUT_PATH = f"{MOUNT_PT}/transformed_medquad.parquet"
result = ensure_parquet(transformed_df, OUT_PATH, coalesce_one=True)
print("write:", result)

# only the data file, not markers
for f in dbutils.fs.ls(OUT_PATH):
    if f.name.startswith("part-"):
        # print(f.name, f.size)
        print(f.size)

df_check = spark.read.parquet(OUT_PATH)
print("rows:", df_check.count())
df_check.show(3, truncate=35)


mount: ok
write: written (new)
9925038
rows: 16359
+-----------------------------------+-----------------------------------+---------------+-----------------------------------+
|                 transform_question|                             answer|         source|                         focus_area|
+-----------------------------------+-----------------------------------+---------------+-----------------------------------+
|what are the symptoms of periphe...|People who have P.A.D. may have ...|NIHSeniorHealth|Peripheral Arterial Disease (P.A...|
|who is at risk for adult acute m...|Smoking, previous chemotherapy t...|      CancerGov|       Adult Acute Myeloid Leukemia|
|what are the treatments for myel...|Because myelodysplastic /myelopr...|      CancerGov|Myelodysplastic/ Myeloproliferat...|
+-----------------------------------+-----------------------------------+---------------+-----------------------------------+
only showing top 3 rows


In [None]:
df_check.printSchema()

root
 |-- transform_question: string (nullable = true)
 |-- answer: string (nullable = true)
 |-- source: string (nullable = true)
 |-- focus_area: string (nullable = true)



---
## Conclusion

This notebook successfully loaded and cleaned the raw Medical FAQ dataset (`medquad.csv`, 16,412 rows) from Azure Blob Storage, applied basic transformations, and saved it as `transformed_medquad.parquet` (16,359 rows) for efficient downstream processing in the Healthcare FAQ Generator pipeline.

### Key Results
- **Row Count**: 16,359 after cleaning (from 16,412 raw).
- **Schema**: `transform_question` (lowercased, cleaned), `answer`, `source`, `focus_area` (all strings).
- **Runtime**: ~1-2 minuAzure tes on DatEdition (CPU).
- **Business Impact**: Establishes a clean, scalable data foundation for multilingual FAQ generation, supporting telehealth applications with reliable medical knowledge.

### Next Steps
- **Preprocessing**: Apply lemmatization and tokenization in the next notebook (02_preprocess_medquad.ipynb).
- **Model Fine-Tuning**: Use the Parquet dataset to fine-tune `flan-t5-base` for FAQ generation.
- **Evaluation**: Build RAG pipeline and evaluate with TF-IDF similarity.
- **Portfolio Polish**: Document data lineage and schema in GitHub README.

This step demonstrates PySpark ETL skills, Azure integration, and data engineering best practices for healthcare data pipelines.