In [31]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when
from pyspark.sql.types import IntegerType

spark = SparkSession.builder.getOrCreate()

# Paths
gold_curated_rwe = "abfss://rwedatalakestorage@datalakerwe.dfs.core.windows.net/gold/curated/rwe/"
gold_ml_output   = "abfss://rwedatalakestorage@datalakerwe.dfs.core.windows.net/gold/ml_ready/"


StatementMeta(openfda, 3, 7, Finished, Available, Finished)

In [32]:
# Read all RWE curated Parquet files
df = spark.read.option("recursiveFileLookup", "true").parquet(gold_curated_rwe + "*")
print(f"✅ Read {df.count()} rows from curated RWE Gold")


StatementMeta(openfda, 3, 8, Finished, Available, Finished)

✅ Read 224 rows from curated RWE Gold


In [33]:
# Convert 'serious' to binary label
df = df.withColumn("serious_label", when(col("serious")=="Yes", 1).otherwise(0))


StatementMeta(openfda, 3, 9, Finished, Available, Finished)

In [35]:
from pyspark.sql.functions import count, sum as spark_sum, col
from pyspark.sql.types import IntegerType

# Aggregate per drug & reaction
agg_df = df.groupBy("drug_name", "reaction") \
           .agg(
               count("*").alias("ae_count"),
               spark_sum(col("serious_label").cast(IntegerType())).alias("serious_count")
           )

# Optional: compute serious ratio
agg_df = agg_df.withColumn("serious_ratio", col("serious_count") / col("ae_count"))

agg_df.show(10)


StatementMeta(openfda, 3, 11, Finished, Available, Finished)

+--------------------+--------------------+--------+-------------+-------------+
|           drug_name|            reaction|ae_count|serious_count|serious_ratio|
+--------------------+--------------------+--------+-------------+-------------+
|IMC-3G3 (RH ANTI-...|             Anaemia|       1|            0|          0.0|
|        SIMVASTATIN.|Urinary tract inf...|       1|            0|          0.0|
|               JODID|           Gastritis|       1|            0|          0.0|
|        SIMVASTATIN.|           Gastritis|       1|            0|          0.0|
|            MUCOFALK|General physical ...|       1|            0|          0.0|
|           METOHEXAL|Helicobacter test...|       1|            0|          0.0|
|IMC-3G3 (RH ANTI-...|    Thrombocytopenia|       1|            0|          0.0|
|         FLUDARABINE|Transplant rejection|       1|            0|          0.0|
|RO 5479599 (ANTI-...|       Herpes zoster|       2|            0|          0.0|
|          PERTUZUMAB|     I

In [36]:
# Write ML-ready dataset
timestamp = spark.sql("SELECT current_timestamp()").collect()[0][0].strftime("%Y-%m-%d-%H-%M-%S")
ml_output_path = f"{gold_ml_output}{timestamp}/"
agg_df.write.mode("overwrite").parquet(ml_output_path)
print(f"✅ Wrote ML-ready dataset to {ml_output_path}")


StatementMeta(openfda, 3, 12, Finished, Available, Finished)

✅ Wrote ML-ready dataset to abfss://rwedatalakestorage@datalakerwe.dfs.core.windows.net/gold/ml_ready/2025-08-28-11-50-39/


# Release Spark pool

In [None]:
mssparkutils.session.stop()