In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import os
import copy
import configparser

In [2]:
def collect_concept_pairs(spark, phenotype_concepts_input, phenotype_concepts_out, save_pair_concepts=True, header="true", delimiter="\t"):
    #Load the data into the spark dataframe
    phenotypeDF_1 = spark.read \
        .option("header", header) \
        .option("delimiter", delimiter) \
        .csv(phenotype_concepts_input)
        #.withColumn("rank", dense_rank().over(Window.partitionBy("phenotype_name").orderBy(desc("source_concept_id"))))
    
    #Load the data into the spark dataframe again for performing self join later 
    phenotypeDF_2 = phenotypeDF_1.rdd.toDF(phenotypeDF_1.schema)
    
    #Alias the columns for the pairwise concepts within the same phenotype definitions
    columns = [phenotypeDF_1["phenotype_name"],
           phenotypeDF_1["source_concept_id"].alias("source_concept_id_1"),
           phenotypeDF_1["standard_concept_id"].alias("standard_concept_id_1"),
           phenotypeDF_1["source_name"].alias("source_name_1"),
           phenotypeDF_1["source_vocabulary"].alias("source_vocabulary_1"),
           phenotypeDF_1["source_domain"].alias("source_domain_1"),
           phenotypeDF_1["standard_name"].alias("standard_name_1"),
           phenotypeDF_1["standard_vocabulary"].alias("standard_vocabulary_1"),
           phenotypeDF_1["standard_domain"].alias("standard_domain_1"),
           phenotypeDF_2["source_concept_id"].alias("source_concept_id_2"),
           phenotypeDF_2["standard_concept_id"].alias("standard_concept_id_2"),
           phenotypeDF_2["source_name"].alias("source_name_2"),
           phenotypeDF_2["source_vocabulary"].alias("source_vocabulary_2"),
           phenotypeDF_2["source_domain"].alias("source_domain_2"),
           phenotypeDF_2["standard_name"].alias("standard_name_2"),
           phenotypeDF_2["standard_vocabulary"].alias("standard_vocabulary_2"),
           phenotypeDF_2["standard_domain"].alias("standard_domain_2"),
          ]
    #Create all combinations of concept pairs within the same phenotype definitions. 
    #Self join the phenotype dataset where rows are NOT the same
    pair_concepts = phenotypeDF_1.join(phenotypeDF_2, phenotypeDF_1["phenotype_name"] == phenotypeDF_2["phenotype_name"]) \
        .where((phenotypeDF_1["standard_concept_id"] != phenotypeDF_2["standard_concept_id"])
           | (phenotypeDF_1["source_concept_id"] != phenotypeDF_2["source_concept_id"])) \
        .select(columns).orderBy(phenotypeDF_1["phenotype_name"])
    
    #determine if we need to save the dataframe to the disk
    if save_pair_concepts:
        #Save the paired concepts to a file
        pair_concepts.write \
            .option("header", "true") \
            .format("csv") \
            .mode("overwrite") \
            .save("phenotype_paired_concepts")
    
    #extract all phenotype related concepts
    phenotype_concepts = phenotypeDF_1.select(col("standard_concept_id").alias("concept_id")) \
        .union(phenotypeDF_1.select(col("source_concept_id").alias("concept_id"))) \
        .distinct();
    
    return (pair_concepts, phenotype_concepts)

In [3]:
def load_cdm_tables(spark, property_ini_file_path):
    
    #Parse the properties
    config = configparser.ConfigParser()
    config.read(property_ini_file_path)
    properties = config.defaults()
    base_url = properties["base_url"]
    
    #Load visit_occurrence
    visit_occurrence = spark.read \
        .jdbc(base_url, "dbo.visit_occurrence", properties=properties)

    #Load condition_occurrence
    condition_occurrence = spark.read \
        .jdbc(base_url, "dbo.condition_occurrence", properties=properties)

    #Load drug_exposure
    drug_exposure = spark.read \
        .jdbc(base_url, "dbo.drug_exposure", properties=properties)

    #Load procedure_occurrence
    procedure_occurrence = spark.read \
        .jdbc(base_url, "dbo.procedure_occurrence", properties=properties)

    #Load measurement
    measurement = spark.read \
        .jdbc(base_url, "dbo.measurement", properties=properties)

    #Load observation
    observation = spark.read \
        .jdbc(base_url, "dbo.observation", properties=properties)
        
    return (visit_occurrence, condition_occurrence, drug_exposure, procedure_occurrence, measurement, observation)

In [4]:
def join_domain_to_visit(domain_tables, visit_occurrence):
    
    joined_domain_tables = []
    
    for domain_table in domain_tables:
        #extract the domain concept_id from the table fields. E.g. condition_concept_id from condition_occurrence
        concept_id_field = [f for f in domain_table.schema.fieldNames() if "concept_id" in f][0]
        #extract the name of the table
        table_domain_field = concept_id_field.replace("_concept_id", "")
        #limit the domain records to those which have a visit_occurrence_id
        joined_domain_table = domain_table \
            .join(v, domain_table["visit_occurrence_id"] == v["visit_occurrence_id"])
        #standardize the output columns
        joined_domain_tables.append(
            joined_domain_table \
                .select(domain_table["person_id"], 
                    domain_table["visit_occurrence_id"], 
                    domain_table[concept_id_field].alias("standard_concept_id"), 
                    lit(table_domain_field).alias("domain"))
        )
        
    return joined_domain_tables

In [5]:
def create_patient_visit_concept(condition, 
                               drug, 
                               procedure, 
                               measurement, 
                               observation, 
                               patient_visit_concept_output,
                               concept_occurrence_output):
    
    #Union person_id, visit_occurrence_id, and concept_id from all domains
    patient_visit_concept = condition \
        .union(drug) \
        .union(procedure) \
        .union(measurement) \
        .union(observation) \
        .distinct() \
        .orderBy("person_id", "visit_occurrence_id") \
        .where(col("standard_concept_id") != 0)
    
    #Save the patient visit concept data 
    patient_visit_concept.write.option("header", "true") \
        .format("csv").mode("overwrite") \
        .save(patient_visit_concept_output)
    
    #Create the concept occurrence matrix
    concept_occurrence_matrix = patient_visit_concept \
        .groupBy("standard_concept_id").count()
        
    #Save the patient visit concept data 
    concept_occurrence_matrix.write.option("header", "true") \
        .format("csv").mode("overwrite") \
        .save(concept_occurrence_output)
    
    return (patient_visit_concept, concept_occurrence_matrix)

In [6]:
def create_cooccurrence_matrix(patient_visit_concept, concept_occurrence_matrix, cooccurrence_matrix_output, phenotype_concepts=None):
     
    #If phenotype_concepts is specified, the records that contain phenotype concepts are kept
    if phenotype_concepts != None:
        patient_visit_concept = patient_visit_concept \
        .join(phenotype_concepts, patient_visit_concept["standard_concept_id"] == phenotype_concepts["concept_id"]) \
        .select(patient_visit_concept["person_id"], 
                patient_visit_concept["visit_occurrence_id"], 
                patient_visit_concept["standard_concept_id"], 
                patient_visit_concept["domain"])
    
    #Add ranks to dataframe to avoid the symetric pairs generated by the self-join operation
    patient_visit_concept = patient_visit_concept.withColumn("rank", 
            dense_rank().over(Window.partitionBy("person_id", "visit_occurrence_id").orderBy(desc("standard_concept_id"))))
    
    #Make two copies of the patient_visit_concept dataframe for self-join
    pvc_1 = patient_visit_concept.rdd.toDF(patient_visit_concept.schema)
    pvc_2 = patient_visit_concept.rdd.toDF(patient_visit_concept.schema)
    
    #Create the cooccurrence matrix via a self-join where the concept_ids are NOT the same
    cooccurrence_matrix = pvc_1 \
        .join(pvc_2, (pvc_1["person_id"] == pvc_2["person_id"])
              & (pvc_1["visit_occurrence_id"] == pvc_2["visit_occurrence_id"])) \
        .where(pvc_1["standard_concept_id"] != pvc_2["standard_concept_id"]) \
        .select(pvc_1["person_id"].alias("person_id"),
                pvc_1["standard_concept_id"].alias("standard_concept_id_1"), 
                pvc_2["standard_concept_id"].alias("standard_concept_id_2")) \
        .groupBy("standard_concept_id_1", "standard_concept_id_2").count()
    
    cooccurrence_matrix = cooccurrence_matrix \
        .join(concept_occurrence_matrix, 
                        cooccurrence_matrix["standard_concept_id_1"] == concept_occurrence_matrix["standard_concept_id"]) \
        .select(cooccurrence_matrix["standard_concept_id_1"],
                cooccurrence_matrix["standard_concept_id_2"],
                cooccurrence_matrix["count"],
                concept_occurrence_matrix["count"].alias("standard_concept_id_1_count")
               ) \
        .join(concept_occurrence_matrix, 
                        cooccurrence_matrix["standard_concept_id_2"] == concept_occurrence_matrix["standard_concept_id"]) \
        .select(cooccurrence_matrix["standard_concept_id_1"],
                cooccurrence_matrix["standard_concept_id_2"],
                cooccurrence_matrix["count"],
                col("standard_concept_id_1_count"),
                concept_occurrence_matrix["count"].alias("standard_concept_id_2_count")
               ) \
        .withColumn("normalized_count", col("count") / (col("standard_concept_id_1_count") + col("standard_concept_id_2_count")))
    
    #Save the cooccurrence matrix
    cooccurrence_matrix.write.option("header", "true") \
        .format("csv").mode("overwrite").save(cooccurrence_matrix_output)
    
    return cooccurrence_matrix

In [7]:
if __name__ == "__main__":

    spark = SparkSession.builder.appName("Phenotype Cooccurrence").getOrCreate()
    spark.sparkContext.setLogLevel("WARN")


    pair_concepts, phenotype_concepts = \
        collect_concept_pairs(spark, "phenotype_WG_concept_ids_from_eMERGE.tsv", "phenotype_paired_concepts")

    v, c, d, p, m, o = load_cdm_tables(spark, "omop_database_properties.ini")

    c_filtered, d_filtered, p_filtered, m_filtered, o_filtered \
        = join_domain_to_visit([c, d, p, m, o], v)
        
    patient_visit_concept, concept_occurrence = create_patient_visit_concept(c_filtered, 
                                                                              d_filtered, 
                                                                              p_filtered, 
                                                                              m_filtered, 
                                                                              o_filtered, 
                                                                              "patient_visit_concept", 
                                                                              "concept_occurrence")
        
    #cooccurrence_matrix = create_cooccurrence_matrix(patient_visit_concept, 
    #                                                 concept_occurrence,
    #                                                 "cooccurrence_matrix", 
    #                                                 phenotype_concepts)

    cooccurrence_matrix_full = create_cooccurrence_matrix(patient_visit_concept, 
                                                     concept_occurrence,
                                                     "cooccurrence_matrix")

In [7]:
v, c, d, p, m, o = load_cdm_tables(spark, "omop_database_properties.ini.bak")