In [1]:
# This notebook is used to create the PROVIDER table
# Based on the following documentations: 
#   https://ohdsi.github.io/CommonDataModel/cdm53.html#provider
#   https://documentation-snds.health-data-hub.fr/omop/documentation_etl/provider.html#description

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, crc32

# Spark initialization
spark = SparkSession.builder \
    .appName("OMOP Provider Table Unified") \
    .getOrCreate()

# Loading data
ir_spe_v = spark.read.option("header", True).csv("../data/raw/ir_spe_v.csv")



In [2]:
ir_spe_v.show()

+-----------+-------------------+
|pfs_spe_cod|              label|
+-----------+-------------------+
|          1|Médecin généraliste|
|          6|         Radiologue|
+-----------+-------------------+



In [3]:
# Rename columns
providers_df = ir_spe_v.select(
    col("PFS_SPE_COD").alias("provider_source_value"),
    col("LABEL").alias("specialty_source_value")
)

providers_df.show()

+---------------------+----------------------+
|provider_source_value|specialty_source_value|
+---------------------+----------------------+
|                    1|   Médecin généraliste|
|                    6|            Radiologue|
+---------------------+----------------------+



In [4]:
# Mapping dictionnaire OMOP using Athena documentation to find mapping: https://athena.ohdsi.org/search-terms/terms?domain=Provider&standardConcept=Standard&page=1&pageSize=15&query=
providers_df = providers_df.withColumn(
    "specialty_concept_id",
    when(col("specialty_source_value") == "Médecin généraliste", 38004446)
    .when(col("specialty_source_value") == "Radiologue", 45756825)
    .otherwise(None)
)

providers_df.show()

+---------------------+----------------------+--------------------+
|provider_source_value|specialty_source_value|specialty_concept_id|
+---------------------+----------------------+--------------------+
|                    1|   Médecin généraliste|            38004446|
|                    6|            Radiologue|            45756825|
+---------------------+----------------------+--------------------+



In [7]:
# Define the maximum value for a signed 32-bit integer
MAX_INT = 2**31 - 1  # 2,147,483,647

# Generate a pseudonymized, deterministic provider_id using Spark's crc32 hash
# - Cast the source value to string for consistent hashing
# - Apply crc32 to get a fast, 32-bit unsigned hash
# - Use modulo to ensure the result fits in the signed 32-bit integer range and stays positive
# - Cast the result to int for compatibility with OMOP schema and storage systems
providers_df = providers_df.withColumn(
    "provider_id",
    (crc32(col("provider_source_value").cast("string")) % MAX_INT).cast("int")
)

providers_df.show()

+---------------------+----------------------+--------------------+-----------+
|provider_source_value|specialty_source_value|specialty_concept_id|provider_id|
+---------------------+----------------------+--------------------+-----------+
|                    1|   Médecin généraliste|            38004446|   64810936|
|                    6|            Radiologue|            45756825|  498629140|
+---------------------+----------------------+--------------------+-----------+



In [8]:
providers_df.printSchema()

root
 |-- provider_source_value: string (nullable = true)
 |-- specialty_source_value: string (nullable = true)
 |-- specialty_concept_id: integer (nullable = true)
 |-- provider_id: integer (nullable = true)



In [None]:
# Save as parquet with snappy compression
providers_df.coalesce(1).write \
    .mode("overwrite") \
    .option("compression", "snappy") \
    .parquet("../data/processed/PROVIDER.parquet")

# Stop Spark
spark.stop()