#Create initial input

#Setup

##Import Packages

In [0]:
import pandas as pd
import trinetx as tnx # automatically installed on the LUCID platform
import matplotlib.pyplot as plt
from pyspark.sql.functions import concat, lit, to_date


##Define Database

Replace the below command with your dataset database and your time window for analysis

In [0]:
db = 'das_fraunhofer_covid19ndd_dwusa_20250515'
output_table = 'initial_input'
censoring_global = '2025-04-30'

## Find all relevant dates

Query patients table for birth and death date

In [0]:
dems = spark.sql(
    f"select patient_id, year_of_birth, month_year_death from {db}.patient"
)
dems = dems.withColumn("birth_date", to_date(concat(dems.year_of_birth.cast("string"), lit("-01-01"))))
dems = dems.withColumn("death_date", to_date(concat(dems.month_year_death.substr(0, 4), lit("-"), dems.month_year_death.substr(5,2), lit("-01"))))
dems = dems.drop("year_of_birth", "month_year_death")
# drop patients without birth year (patients over 90 who would be easily identifiable)
dems = dems.dropna(subset=["birth_date"])

Find dates on which patients were first tested positive

In [0]:
index_parent_tested_positive = pd.DataFrame({
    'code': ['9088','9089'],
    'code_system': ['TNX','TNX'],
    'feature': ['date_first_tested_positive','date_first_tested_positive'],
    'qualifier_text':['Positive','Positive']
})

index_children_tested_positive = tnx.find_children(
                    database=db, 
                    code_list=index_parent_tested_positive
                )

date_first_tested_positive = tnx.find_date(
                database=db, 
                code_list=index_children_tested_positive, 
                tables=['lab_result'], 
                function='first'
            )

Find date on which patients were first tested in general

In [0]:
index_parent_tested = pd.DataFrame({
    'code': ['9088','9089'],
    'code_system': ['TNX','TNX'],
    'feature': ['date_first_tested','date_first_tested'],
})

index_children_tested = tnx.find_children(
                    database=db, 
                    code_list=index_parent_tested
                )

date_first_tested = tnx.find_date(
                database=db, 
                code_list=index_children_tested, 
                tables=['lab_result'], 
                function='first'
            )

Find date of first COVID diagnosis

In [0]:
index_parent_covid_diagnosis = pd.DataFrame({
    'code': ['U07.1','U07.2','J12.82','B34.2'],
    'code_system': ['ICD-10-CM','ICD-10-CM','ICD-10-CM','ICD-10-CM'],
    'feature': ['date_first_covid_diagnosis','date_first_covid_diagnosis','date_first_covid_diagnosis','date_first_covid_diagnosis',],
})

index_children_covid_diagnosis = tnx.find_children(
                    database=db, 
                    code_list=index_parent_covid_diagnosis
                )

date_first_covid_diagnosis = tnx.find_date(
                database=db, 
                
                code_list=index_children_covid_diagnosis, 
                tables=['diagnosis'], 
                function='first'
            )
                        

Find date of first AD diagnosis

In [0]:
index_parent_ad_diagnosis = pd.DataFrame({
    'code': ['G30'],
    'code_system': ['ICD-10-CM'],
    'feature': ['date_first_ad_diagnosis'],
})

index_children_ad_diagnosis = tnx.find_children(
                    database=db, 
                    code_list=index_parent_ad_diagnosis
                )

date_first_ad_diagnosis = tnx.find_date(
                database=db, 
                code_list=index_children_ad_diagnosis, 
                tables=['diagnosis'], 
                function='first'
            )
     

Find date of first PD diagnosis

In [0]:
index_parent_pd_diagnosis = pd.DataFrame({
    'code': ['G20'],
    'code_system': ['ICD-10-CM'],
    'feature': ['date_first_pd_diagnosis'],
})

index_children_pd_diagnosis = tnx.find_children(
                    database=db, 
                    code_list=index_parent_pd_diagnosis
                )
date_first_pd_diagnosis = tnx.find_date(
                database=db, 
                code_list=index_children_pd_diagnosis, 
                tables=['diagnosis'], 
                function='first'
            )       

Find date of first unspecified dementia diagnosis

In [0]:
index_parent_unspecified_dementia_diagnosis = pd.DataFrame({
    'code': ['F03'],
    'code_system': ['ICD-10-CM'],
    'feature': ['date_first_unspecified_dementia_diagnosis'],
})

index_children_unspecified_dementia_diagnosis = tnx.find_children(
                    database=db, 
                    code_list=index_parent_unspecified_dementia_diagnosis
                )
date_first_unspecified_dementia_diagnosis = tnx.find_date(
                database=db, 
                code_list=index_children_unspecified_dementia_diagnosis, 
                tables=['diagnosis'], 
                function='first'
            )       

## Merge all the dataframes

In [0]:
df = (
    dems.join(date_first_tested_positive, on=["patient_id"], how="left")
    .join(date_first_tested, on=["patient_id"], how="left")
    .join(date_first_covid_diagnosis, on=["patient_id"], how="left")
    .join(date_first_ad_diagnosis, on=["patient_id"], how="left")
    .join(date_first_pd_diagnosis, on=["patient_id"], how="left")
    .join(date_first_unspecified_dementia_diagnosis, on=["patient_id"], how="left")
    .withColumn("censoring_global", lit(censoring_global).cast("date"))
    # hospitalized_due_to_covid is all null because admitting_diagnosis is not populated in the diagnosis table for COVID
    .withColumn("hospitalized_due_to_covid", lit(None))
    .toPandas()
    )

In [0]:
df["birth_date"] = pd.to_datetime(df["birth_date"])
df["date_first_tested_positive"] = pd.to_datetime(df["date_first_tested_positive"])
df["date_first_covid_diagnosis"] = pd.to_datetime(df["date_first_covid_diagnosis"])
df["date_first_tested"] = pd.to_datetime(df["date_first_tested"])
df["date_first_ad_diagnosis"] = pd.to_datetime(df["date_first_ad_diagnosis"])
df["date_first_pd_diagnosis"] = pd.to_datetime(df["date_first_pd_diagnosis"])
df["date_first_unspecified_dementia_diagnosis"] = pd.to_datetime(df["date_first_unspecified_dementia_diagnosis"])
df["censoring_global"] = pd.to_datetime(df["censoring_global"])

## Inclusion / exclusion criteria

In [0]:
# control pool: individuals who never had a reported COVID infection and were 65+ years old on January 1, 2015
inclusion_control_pool = (
    (df["date_first_tested_positive"].isnull())
    & (df["date_first_covid_diagnosis"].isnull())
    & (((pd.to_datetime("2015-01-01") - df["birth_date"])).dt.days / 365 >= 65)
)
# covid group: individuals who had a reported COVID infection at some point and were at least 70 years old then
inclusion_covid_group = (
    ~df["date_first_tested_positive"].isnull()
    | ~df["date_first_covid_diagnosis"].isnull()
) & (
    (
        df[["date_first_tested_positive", "date_first_covid_diagnosis"]].min(axis=1)
        - df["birth_date"]
    ).dt.days
    / 365
    >= 70
)
# exclude diagnoses before March 2020 (probably artifacts?)
exclusion_early_infection = (df["date_first_tested_positive"] < pd.to_datetime("2020-03-01")) | (
    df["date_first_covid_diagnosis"] < pd.to_datetime("2020-03-01"))

df = df[inclusion_control_pool | (inclusion_covid_group & ~exclusion_early_infection)]

Save the dataframe

In [0]:
df.to_csv("/Workspace/Users/jannis.guski@scai.fraunhofer.de/commute-tmle/data/a_inputs/trinetx.csv", index=False)

Write initial input to database

In [0]:
#matrix.write.mode('overwrite').option('overwriteSchema', 'True').saveAsTable(f'{db}.{output_table}')