_Author_ = "Sevda Molani"

_copyright_ = "2022 Sevda Molani"

_License_ = "Institute for Systems Biology"

_Version_ = "1.0"

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import numpy as np

In [0]:
df1 = spark.sql("""SELECT 
CONCAT(instance, pat_id) as patient_id,
pat_id,
instance,
pat_enc_csn_id
FROM 
rdp_phi_sandbox.sm_positive_delta_results_feb""")

In [0]:
# Function to create temporary views for max WHO scores
def create_max_who_view(interval):
    query = f"""
    SELECT pat_id, instance, pat_enc_csn_id, admissiondatetime, onehourbase, max_who_{interval}
    FROM (
        SELECT sm_positive_delta_results_feb.pat_id, sm_positive_delta_results_feb.instance,
               sm_positive_delta_results_feb.pat_enc_csn_id, admissiondatetime, onehourbase,
               MAX(rdp_phi_sandbox.sm_who_scores_feb.who_score) as max_who_{interval}
        FROM rdp_phi_sandbox.sm_who_scores_feb
        INNER JOIN rdp_phi_sandbox.sm_positive_delta_results_feb 
            ON rdp_phi_sandbox.sm_who_scores_feb.patient_id = sm_positive_delta_results_feb.patient_id
        WHERE TIMESTAMP(rdp_phi_sandbox.sm_who_scores_feb.record_dt) <= onehourbase + INTERVAL '{interval}' day
          AND TIMESTAMP(rdp_phi_sandbox.sm_who_scores_feb.record_dt) >= onehourbase
        GROUP BY sm_positive_delta_results_feb.pat_id, sm_positive_delta_results_feb.instance, 
                 sm_positive_delta_results_feb.pat_enc_csn_id, admissiondatetime, onehourbase
    )
    """
    tmp_view = spark.sql(query).dropDuplicates()
    tmp_view.createOrReplaceTempView(f'tmp_{interval}')

# Create views for intervals 1, 7, 14, 28, and 56 days
for interval in [1, 7, 14, 28, 56]:
    create_max_who_view(interval)

# Joining the temporary views
join_expr = ['pat_id', 'pat_enc_csn_id', 'instance', 'admissiondatetime', 'onehourbase']
d4 = spark.table('tmp_56')
for interval in [28, 14, 7, 1]:
    d4 = d4.join(spark.table(f'tmp_{interval}'), join_expr, "inner")

# Add patient_id column and drop duplicates
d4 = d4.withColumn('patient_id', concat('instance', 'pat_id')).dropDuplicates()
d4.createOrReplaceTempView('d4')

# Create the first WHO date view
tmp2 = spark.sql("""
    SELECT patient_id, pat_id, instance, pat_enc_csn_id, admissiondatetime, onehourbase, 
           max_who_1, max_who_7, max_who_14, max_who_28, max_who_56, first_who_date
    FROM (
        SELECT d4.patient_id, d4.pat_id, d4.pat_enc_csn_id, d4.instance, admissiondatetime, onehourbase,
               max_who_1, max_who_7, max_who_14, max_who_28, max_who_56,
               MIN(TIMESTAMP(rdp_phi_sandbox.sm_who_scores_feb.record_dt)) as first_who_date
        FROM rdp_phi_sandbox.sm_who_scores_feb
        INNER JOIN d4 ON rdp_phi_sandbox.sm_who_scores_feb.patient_id = d4.patient_id
        WHERE TIMESTAMP(rdp_phi_sandbox.sm_who_scores_feb.record_dt) >= onehourbase - INTERVAL '24' hour
          AND TIMESTAMP(rdp_phi_sandbox.sm_who_scores_feb.record_dt) <= onehourbase + INTERVAL '1' hour
          AND rdp_phi_sandbox.sm_who_scores_feb.who_score IS NOT NULL
        GROUP BY d4.patient_id, d4.pat_id, d4.pat_enc_csn_id, d4.instance, admissiondatetime, onehourbase, 
                 max_who_1, max_who_7, max_who_14, max_who_28, max_who_56
    )
""").dropDuplicates()
tmp2.createOrReplaceTempView('tmp2')

# Create the first WHO view
tmp3 = spark.sql("""
    SELECT pat_id, instance, pat_enc_csn_id, admissiondatetime, onehourbase, 
           max_who_1, max_who_7, max_who_14, max_who_28, max_who_56, first_who_date, first_who
    FROM (
        SELECT tmp2.pat_id, tmp2.instance, tmp2.pat_enc_csn_id, admissiondatetime, onehourbase,
               max_who_1, max_who_7, max_who_14, max_who_28, max_who_56, first_who_date,
               MIN(rdp_phi_sandbox.sm_who_scores_feb.who_score) as first_who
        FROM rdp_phi_sandbox.sm_who_scores_feb
        INNER JOIN tmp2 ON rdp_phi_sandbox.sm_who_scores_feb.patient_id = tmp2.patient_id
        WHERE TIMESTAMP(rdp_phi_sandbox.sm_who_scores_feb.record_dt) = first_who_date
        GROUP BY tmp2.pat_id, tmp2.instance, tmp2.pat_enc_csn_id, admissiondatetime, onehourbase,
                 max_who_1, max_who_7, max_who_14, max_who_28, max_who_56, first_who_date
    )
""").dropDuplicates()

# Join the final result with df1
df = df1.join(tmp3, ['pat_id', 'pat_enc_csn_id', 'instance'], 'left').dropDuplicates()

In [0]:
def get_all_descendant_snomed_codes(code):
  descendant_snomed_codes_df = spark.sql(
  """
  SELECT
    oc1.concept_code as ancestor_snomed_code,
    oc1.concept_name as ancestor_concept_name,
    oc2.concept_code as descendant_snomed_code,
    oc2.concept_name as descendant_concept_name
  FROM (
    SELECT * FROM rdp_phi_sandbox.omop_concept_2020_08_05
    WHERE
      concept_code = {snomed_code} AND
      vocabulary_id = 'SNOMED') as oc1
  JOIN rdp_phi_sandbox.omop_concept_ancestor_2020_08_05 as oca
  ON oc1.concept_id = oca.ancestor_concept_id
  JOIN rdp_phi_sandbox.omop_concept_2020_08_05 as oc2
  ON oca.descendant_concept_id = oc2.concept_id
  ORDER BY min_levels_of_separation, oc2.concept_name
  """.format(snomed_code=code))
  
  return descendant_snomed_codes_df


def get_dsi_from_snomed_descendant(code):
  query = "SELECT * FROM rdp_phi_sandbox.jl_diagnosis_snomed_icd" 
  df_dsi = spark.sql(query)
  
  df_snomed = get_all_descendant_snomed_codes(code)
  df_snomed = df_snomed.select('descendant_snomed_code')
  
  df = df_dsi.join(df_snomed, df_dsi.SNOMED == df_snomed.descendant_snomed_code, 
                   how='inner').drop(df_snomed.descendant_snomed_code).drop(df_dsi.name)
  
  return df

In [0]:
### Reviewed codes - these are reviewed codes, including the additional GI factors

rf2snomed = {} # dictionary for mapping the risk factor names to their snomed codes

rf2snomed['htn'] = ['31992008', #Secondary
                    '59621000', #Essential
                   ] 

rf2snomed['asthma'] = ['195967001'] 

rf2snomed['COPD'] = [
  '13645005', # Chronic obstructive lung disease (disorder)
  '87433001', # Emphysema (disorder)
  '185086009', # Chronic bronchitis
]

rf2snomed['stroke'] = ['230690007']

rf2snomed['liver_disease'] = ['235856003']

rf2snomed['ckd'] = ['709044004'] 

rf2snomed['hld'] = ['55822004','370992007'] #hyperlipidemia and dyslipidemia

rf2snomed['osa'] = ['78275009'] 

rf2snomed['diabetes'] = ['44054006', #type 2
                         '46635009' #type 1 
                        ]

rf2snomed['sot'] = ['739024006', #heart
                    '739025007', #cild of hear and is heart-lung
                    '737295003', #kidney
                    '737297006', #liver
                    '737296002' #lung
                   ] # Solid organ transplantation

rf2snomed['ics'] = ['737300001', #bone marrow
                    '370388006', #immunocompromised
                    '370391006', #immunosuppressed
                    '234532001', #Immunodeficiency disorder
                    '86406008', #HIV
                    '62479008' #Acquired immune deficiency syndrome
                   ] # Immunocompromised state

rf2snomed['dementia'] = ['52448006'] #active

#rf2snomed['heart_conditions'] = ['53741008', '84114007', '85898001'] 
rf2snomed['Coronary_arteriosclerosis'] =['53741008','443502000'] #active
rf2snomed['Heart_failure'] =['84114007'] #active
rf2snomed['Cardiomyopathy'] =['85898001'] #active

print(len(rf2snomed))

In [0]:
def add_risk_factors(df, risk_factors_dict):
  for risk_factor in risk_factors_dict:
    codes = risk_factors_dict[risk_factor]
    diagnosis_id_list = []
    for code in codes:
      df_dsi = get_dsi_from_snomed_descendant(code)
      new_diag_id = [str(int(row.id)) for row in df_dsi.select('id').distinct().collect()]
      diagnosis_id_list = diagnosis_id_list + new_diag_id
    diagnosis_ids = "','".join(diagnosis_id_list)
    
    tmp =  spark.sql(
    """
    SELECT DISTINCT sm_positive_delta_results_feb.pat_id,sm_positive_delta_results_feb.instance, sm_positive_delta_results_feb.pat_enc_csn_id, IF(COUNT(diagnosis.dx_id) > 0, 'yes', 'no') AS """ + risk_factor + """ 
    FROM (((rdp_phi.problemlist
      INNER JOIN rdp_phi.diagnosismapping ON rdp_phi.problemlist.dx_id = rdp_phi.diagnosismapping.dx_id)
      INNER JOIN rdp_phi.diagnosis ON rdp_phi.diagnosismapping.dx_id = rdp_phi.diagnosis.dx_id)
      INNER JOIN rdp_phi_sandbox.sm_positive_delta_results_feb ON problemlist.pat_id = sm_positive_delta_results_feb.pat_id AND problemlist.instance = sm_positive_delta_results_feb.instance)
    WHERE problemlist.NOTED_DATE + INTERVAL 1 day < sm_positive_delta_results_feb.onehourbase 
      AND diagnosismapping.dx_id in ('""" + diagnosis_ids + """')
      AND (problemlist.problemstatus = "Active" OR problemlist.NOTED_DATE >= sm_positive_delta_results_feb.onehourbase - INTERVAL 730 day)
    GROUP BY sm_positive_delta_results_feb.pat_id,sm_positive_delta_results_feb.instance, sm_positive_delta_results_feb.pat_enc_csn_id
    """
    )
    
    print(risk_factor)
    df = df.join(tmp, ['pat_id', 'pat_enc_csn_id','instance'], how='left').fillna({risk_factor: 'no'})
  return df

In [0]:
df = add_risk_factors(df, rf2snomed)

In [0]:
rf2snomed = {} # dictionary for mapping the risk factor names to their snomed codes

rf2snomed['cancer'] = ['363346000'] # any kind of malignancy #active

print(len(rf2snomed))

In [0]:
def add_risk_factors(df, risk_factors_dict):
  for risk_factor in risk_factors_dict:
    codes = risk_factors_dict[risk_factor]
    diagnosis_id_list = []
    for code in codes:
      df_dsi = get_dsi_from_snomed_descendant(code)
      new_diag_id = [str(int(row.id)) for row in df_dsi.select('id').distinct().collect()]
      diagnosis_id_list = diagnosis_id_list + new_diag_id
    diagnosis_ids = "','".join(diagnosis_id_list)
    
    tmp =  spark.sql(
    """
    SELECT DISTINCT sm_positive_delta_results_feb.pat_id,sm_positive_delta_results_feb.instance, sm_positive_delta_results_feb.pat_enc_csn_id, IF(COUNT(diagnosis.dx_id) > 0, 'yes', 'no') AS """ + risk_factor + """ 
    FROM (((rdp_phi.problemlist
      INNER JOIN rdp_phi.diagnosismapping ON rdp_phi.problemlist.dx_id = rdp_phi.diagnosismapping.dx_id)
      INNER JOIN rdp_phi.diagnosis ON rdp_phi.diagnosismapping.dx_id = rdp_phi.diagnosis.dx_id)
      INNER JOIN rdp_phi_sandbox.sm_positive_delta_results_feb ON problemlist.pat_id = sm_positive_delta_results_feb.pat_id AND problemlist.instance = sm_positive_delta_results_feb.instance)
    WHERE problemlist.NOTED_DATE + INTERVAL 1 day < sm_positive_delta_results_feb.onehourbase 
      AND diagnosismapping.dx_id in ('""" + diagnosis_ids + """')
      AND (problemlist.problemstatus = "Active" OR problemlist.NOTED_DATE >= sm_positive_delta_results_feb.onehourbase - INTERVAL 1825 day)
    GROUP BY sm_positive_delta_results_feb.pat_id,sm_positive_delta_results_feb.instance, sm_positive_delta_results_feb.pat_enc_csn_id
    """
    )
    
    print(risk_factor)
    df = df.join(tmp, ['pat_id', 'pat_enc_csn_id','instance'], how='left').fillna({risk_factor: 'no'})
  return df

In [0]:
df = add_risk_factors(df, rf2snomed)

In [0]:
df.createOrReplaceTempView('df_baseline')

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, lower

spark = SparkSession.builder.getOrCreate()

COMOR = spark.sql("""
SELECT 
    problemlist.pat_id, 
    problemlist.instance,
    CASE
        WHEN lower(diagnosismapping.codes) RLIKE '^b2[0-14]|b24' THEN 'AIDS'
        WHEN lower(diagnosismapping.codes) RLIKE '^c(0[0-9]|1[0-9]|21|22|23|24|25|26|30|31|32|33|34|37|38|39|40|41|43|45|46|47|48|49|50|51|52|53|54|56|57|58|6[0-9]|70|71|72|73|74|75|76|81|82|83|84|85|88|90|91|92|93|94|95|96|97)' THEN 'Cancer'
        WHEN lower(diagnosismapping.codes) RLIKE '^c7[7-9]|c80' THEN 'metastatic'
        WHEN lower(diagnosismapping.codes) RLIKE '^g45|g46|h34|i6' THEN 'CD'
        WHEN lower(diagnosismapping.codes) RLIKE '^i27[.]8|i27[.]9|j4[0-7]|j60|j61|j62|j63|j64|j65|j66|j67|j68[.]4|j70[.]1|j70[.]3' THEN 'CPD'
        WHEN lower(diagnosismapping.codes) RLIKE '^i09[.]9|i11[.]0|i13[.]0|i13[.]2|i25[.]5|i42[.]0|i42[.]5|i42[.]6|i42[.]7|i42[.]8|i42[.]9|i43|i50|p29[.]0' THEN 'CHF'
        WHEN lower(diagnosismapping.codes) RLIKE '^f0[0-3]|f05[.]1|g30|g31[.]1' THEN 'Dementia'
        WHEN lower(diagnosismapping.codes) RLIKE '^e1[0-4][.]([01235679]|1)' THEN 'Diabetes'
        WHEN lower(diagnosismapping.codes) RLIKE '^e1[0-4][.]([0345]|7)' THEN 'Diabetes_W'
        WHEN lower(diagnosismapping.codes) RLIKE '^b18|k70[.]0|k70[.]1|k70[.]2|k70[.]3|k70[.]9|k71[.]3|k71[.]4|k71[.]5|k71[.]7|k73|k74|k76[.]0|k76[.]2|k76[.]3|k76[.]4|k76[.]8|k76[.]9|z94[.]4' THEN 'LD_M'
        WHEN lower(diagnosismapping.codes) RLIKE '^i85[.]0|i85[.]9|i86[.]4|i98[.]2|k70[.]4|k71[.]1|k72[.]1|k72[.]9|k76[.]5|k76[.]6|k76[.]7' THEN 'LD_S'
        WHEN lower(diagnosismapping.codes) RLIKE '^i21|i22|i25[.]2|i09[.]9|i11[.]0|i13[.]0|i13[.]2|i25[.]5|i42[.]0|i42[.]5|i42[.]6|i42[.]7|i42[.]8|i42[.]9|i43|i50|p29[.]0' THEN 'MI'
        WHEN lower(diagnosismapping.codes) RLIKE '^k2[5-8]' THEN 'PUD'
        WHEN lower(diagnosismapping.codes) RLIKE '^i70|i71|i73[.]1|i73[.]8|i73[.]9|i77[.]1|i79[.]0|i79[.]2|k55[.]1|k55[.]8|k55[.]9|z95[.]8|z95[.]9' THEN 'PVD'
        WHEN lower(diagnosismapping.codes) RLIKE '^g04[.]1|g11[.]4|g80[.]1|g80[.]2|g81|g82|g83[.]0|g83[.]1|g83[.]2|g83[.]3|g83[.]4|g83[.]9' THEN 'Plegia'
        WHEN lower(diagnosismapping.codes) RLIKE '^i12[.]0|i13[.]1|n03[.]2|n03[.]3|n03[.]4|n03[.]5|n03[.]6|n03[.]7|n05[.]2|n05[.]3|n05[.]4|n05[.]5|n05[.]6|n05[.]7|n18|n19|n25[.]0|z49[.]0|z49[.]1|z49[.]2' THEN 'RenalD'
        ELSE 'Other'
    END AS comorbidity
FROM 
    problemlist
JOIN 
    diagnosismapping 
ON 
    problemlist.code = diagnosismapping.code
""")


In [0]:
from pyspark.sql.window import Window
### This cell is to choose the highest rank among similar comorbidities such as cancer and metastatic
w = Window.partitionBy(['pat_id','instance']).rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)

COMOR1 = COMOR.withColumn("All_COMO", (collect_list(col("Comorbidities")).over(w)))

COMOR1 = COMOR1.withColumn("Mutual", 
                       when((array_contains("All_COMO","Diabetes")) & (array_contains("All_COMO","Diabetes_W")), "Diabetes")
                       .when((array_contains("All_COMO","Cancer"))  & (array_contains("All_COMO","metastatic")), "Cancer")
                       .when((array_contains("All_COMO","LD_M")) & (array_contains("All_COMO","LD_S")), "LD_M")
                       .otherwise('ok'))

COMOR1 = COMOR1.filter(COMOR1.Mutual != 'ok')

COMOR1 = COMOR1.drop(COMOR1.All_COMO).drop(COMOR1.Comorbidities)

COMOR1 = COMOR1.withColumnRenamed("Mutual","Comorbidities")

COMOR = COMOR.join(COMOR1,["pat_id","instance","Comorbidities"],'leftanti') #### ATTENTION IT IS LEFTANTI

COMOR = COMOR.dropDuplicates()

In [0]:
COMOR = COMOR.withColumn("scores", 
                         when(COMOR.Comorbidities == "AIDS", 6)
                        .when(COMOR.Comorbidities == "Cancer", 2)
                        .when(COMOR.Comorbidities == "metastatic", 6)
                        .when(COMOR.Comorbidities == "CD", 1)
                        .when(COMOR.Comorbidities == "CPD", 1)
                        .when(COMOR.Comorbidities == "CHF", 1)
                        .when(COMOR.Comorbidities == "Dementia", 1)
                        .when(COMOR.Comorbidities == "Diabetes", 1)
                        .when(COMOR.Comorbidities == "Diabetes_W", 2)
                        .when(COMOR.Comorbidities == "LD_M", 1)
                        .when(COMOR.Comorbidities == "LD_S", 3)
                        .when(COMOR.Comorbidities == "MI", 1)
                        .when(COMOR.Comorbidities == "PUD", 1)
                        .when(COMOR.Comorbidities == "PVD", 1)
                        .when(COMOR.Comorbidities == "Plegia", 2)
                        .when(COMOR.Comorbidities == "RenalD", 2)
                        .when(COMOR.Comorbidities == "Rheumatic", 1))

w = Window.partitionBy(['pat_id','instance']).rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)

COMOR = COMOR.withColumn("CCI", sum('scores').over(w))

COMOR = COMOR.drop('Comorbidities').drop('scores')
COMOR = COMOR.dropDuplicates()

In [0]:
df = df.join(COMOR,["pat_id","instance"],'left')
df = df.withColumn('CCI',when(df.CCI.isNull(),0).otherwise(df.CCI))

### 3. Saving the updated dataframe

In [0]:
%sql
DROP TABLE rdp_phi_sandbox.sm_patt_delta_baseline_feb;

In [0]:
table_name = 'rdp_phi_sandbox.sm_patt_delta_baseline_feb'
df.write.saveAsTable(table_name)

In [0]:
%sql
REFRESH table_name;
SELECT * FROM rdp_phi_sandbox.sm_patt_delta_baseline_feb;