In [1]:
from google.cloud import bigquery
from dxFilterLibraryPreGrading import *

client = bigquery.Client()

## Create the phecode lookup table

In [3]:
# Load the phecodes file
fn_phecodes = "/home/youngjm/private_transfer/ICDexclusion_code/Phecode_map_v1_2_icd10cm_beta.csv"
df_phecodes = pd.read_csv(fn_phecodes, encoding="unicode_escape")
print(list(df_phecodes))

['icd10cm', 'icd10cm_str', 'phecode', 'phecode_str', 'exclude_range', 'exclude_name', 'leaf', 'rollup']


In [32]:
df_phecodes[df_phecodes['icd10cm_str'].str.contains("Neurofibromatosis")]

Unnamed: 0,icd10cm,icd10cm_str,phecode,phecode_str,exclude_range,exclude_name,leaf,rollup
34114,Q85.02,"Neurofibromatosis, type 2",199.4,Neurofibromatosis,195-199.99,neoplasms,1.0,1.0
34115,Q85.0,Neurofibromatosis (nonmalignant),199.4,Neurofibromatosis,195-199.99,neoplasms,1.0,1.0
34116,Q85.01,"Neurofibromatosis, type 1",199.4,Neurofibromatosis,195-199.99,neoplasms,1.0,1.0
34118,Q85.00,"Neurofibromatosis, unspecified",199.4,Neurofibromatosis,195-199.99,neoplasms,1.0,1.0


In [4]:
table_id = "lab.icd10_to_phecode"

job_config = bigquery.LoadJobConfig(
    # Specify a (partial) schema. All columns are always written to the
    # table. The schema is used to assist in data type definitions.
    schema=[
        # Specify the type of columns whose type cannot be auto-detected. For
        # example the "title" column uses pandas dtype "object", so its
        # data type is ambiguous.
        bigquery.SchemaField("icd10cm", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("icd10cm_str", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("phecode_str", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("exclude_range", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("exclude_name", bigquery.enums.SqlTypeNames.STRING),
    ],
    # Optionally, set the write disposition. BigQuery appends loaded rows
    # to an existing table by default, but with WRITE_TRUNCATE write
    # disposition it replaces the table with the loaded data.
    write_disposition="WRITE_TRUNCATE",  # -- we do want to replace this table if it ever is updated
)

# job = client.load_table_from_dataframe(
#     df_phecodes, table_id, job_config=job_config
# )  # Make an API request.
# job.result()  # Wait for the job to complete.

## Create a table containing all patient dx as phecodes

In [5]:
## Create the table for patient phecodes
table_all_patients = "arcus.patient"
df_patient_phecodes = mapProcReqToPheCodes(table_all_patients)
df_patient_phecodes.dtypes

pat_id          object
dx_source       object
icd10cm         object
icd10cm_str     object
phecode        float64
phecode_str     object
dtype: object

In [6]:
table_id = "lab.patient_phecode_dx"

job_config = bigquery.LoadJobConfig(
    # Specify a (partial) schema. All columns are always written to the
    # table. The schema is used to assist in data type definitions.
    schema=[
        # Specify the type of columns whose type cannot be auto-detected. For
        # example the "title" column uses pandas dtype "object", so its
        # data type is ambiguous.
        bigquery.SchemaField("pat_id", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("dx_source", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("icd10cm", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("icd10cm_str", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("phecode_str", bigquery.enums.SqlTypeNames.STRING),
    ]
)

# job = client.load_table_from_dataframe(
#     df_patient_phecodes, table_id, job_config=job_config
# )  # Make an API request.
# job.result()  # Wait for the job to complete.


In [2]:
def convertExcludeDxCsvToSql(fn):
    # Load the dx filter file
    df = pd.read_csv(fn)
    # Check that the filter file has the columns we expect it to have, namely include/exclude (with specific types) and phecode
    assert "exclude_or_include_AAB_TS" in list(df)
    assert "phecode" in list(df)
    # Get only the codes we want to exclude
    dx_exclude = list(set(df[df['exclude_or_include_AAB_TS'] == "exclude"]['phecode']))
    # Start the query
    q = "with exclude_table as (select pat_id, phecode from lab.patient_phecode_dx where "
    # For each exclude row, 
    for dx in dx_exclude:
        q += "phecode = "+str(dx)+" or "
        
    # After iterating through the rows, remove the last "or "
    q = q[:-3] +")"
    
    # Return the filter query
    return q
    
fnExCodes = "/home/youngjm/bgdlab/code/filter-scans-by-dx/dx-filters/phecodes_with_exclusion_TS_and_AAB_19April2024.csv"
q_dx_filter = convertExcludeDxCsvToSql(fnExCodes)

In [4]:
# Need to update an existing query to incorporate the dx filter
def addDxFilterToQuery(fn_query, q_dx_filter):

    with open(fn_query, 'r') as f:
        q_project = f.read()

    # If there is a dx filter, incorporate it into the loaded query
    if q_dx_filter != "":
        q_tmp = q_dx_filter + q_project.split("where")[0] 
        q_tmp += "left join exclude_table on proc_ord.pat_id = exclude_table.pat_id where exclude_table.pat_id is null and"
        q_tmp += q_project.split("where")[1]

    return q_tmp

fn_query = "./queries/start2022_ages12-20years.txt"
q_project = addDxFilterToQuery(fn_query, q_dx_filter)
# print(q_project)

# Now attempting to rewrite `getMoreReportsToGrade()`

In [7]:
numUsersForValidation = 2

def getMoreReportsToGrade(name, project_id="SLIP", numberToAdd=100):
    # Global var declaration
    global numUsersForValidation
    print("It is expected for this function to take several minutes to run. Your patience is appreciated.")
    
    # Initialize the client service
    client = bigquery.Client()  
    
    # Load the config file
    fn = "./queries/config.json" ## write this file
    with open(fn, "r") as f:
        project_lookup = json.load(f)
        
    # Get the info for the specified project
    project_info = project_lookup[project_id]
    queryFn = project_info['query']
    q_dx_filter = ''
    if 'dx_filter' in project_info:
        # Get the name of the dx filter file
        fn_dx_filter = project_info['dx_filter']
        # Convert the contents of the dx filter file to a sql query
        q_dx_filter = convertExcludeDxCsvToSql(fn_dx_filter)
    
    # Open the specified query file
    with open(queryFn, 'r') as f:
        q_project = f.read()
        
    # If there is a dx filter, incorporate it into the loaded query
    if q_dx_filter != "":
        q_tmp = q_dx_filter + q_project.split("where")[0] 
        q_tmp += "left join exclude_table on proc_ord.pat_id = exclude_table.pat_id where exclude_table.pat_id is null and"
        q_tmp += q_project.split("where")[1]
        q_project = q_tmp
        
    # Run the query from the specified file -- should the query itself be passed to a dx filtering option?
    dfProject = client.query(q_project).to_dataframe()
    # Now we have the ids of the reports we want to grade for Project project
    projectProcIds = dfProject['proc_ord_id'].values 
    print("Number of ids for project", project_id, len(projectProcIds))
    
    # Get the proc_ord_ids from the grader table
    qGradeTable = "SELECT proc_ord_id, grader_name from lab.grader_table_with_metadata where grade_category='Unique'; "
    dfGradeTable = client.query(qGradeTable).to_dataframe()
    gradeTableProcIds = dfGradeTable['proc_ord_id'].values
    userProcIds = dfGradeTable[dfGradeTable['grader_name'] == name]['proc_ord_id'].values
    
    # Validation: are there any reports for the project that need to be validated that name hasn't graded?
    toAddValidation = []
    for procId in projectProcIds: # for each proc_id in the project
        if procId in dfGradeTable['proc_ord_id'].values: # if the proc_id report was already graded
            graders = dfGradeTable.loc[dfGradeTable['proc_ord_id'] == procId, "grader_name"].values
            gradersStr = ", ".join(graders)
            # if the report was not graded by Coarse Text Search or the user and has not been graded N times
            if "Coarse Text Search" not in gradersStr and name not in gradersStr and len(graders) < numUsersForValidation:
                toAddValidation.append(procId)  
            
    # projectReportsInTable = [procId for procId in projectProcIds if procId in dfGradeTable['proc_ord_id'].values and not dfGradeTable.loc[dfGradeTable['proc_ord_id'] == procId, "grader_name"].str.contains("Coarse Text Search").any() ]
    # Ignore procIds rated by User name
    print("Number of reports that need to be validated:", len(toAddValidation))
    toAddValidation = [procId for procId in toAddValidation if procId not in userProcIds][:numberToAdd]
    print("Number of validation reports added:", len(toAddValidation))
    print(numberToAdd)
    
    # Add validation reports - procIds already in the table
    if len(toAddValidation) > 0:
        addReportsQuery = 'insert into lab.grader_table_with_metadata (proc_ord_id, grader_name, grade, grade_category, pat_id, age_in_days, proc_ord_year, proc_name, report_origin_table, project, grade_date) VALUES '
        for procId in toAddValidation[:numberToAdd]:
            row = dfProject[dfProject['proc_ord_id'] == procId]
            addReportsQuery += '("'+str(procId)+'", "'+name+'", 999, "Unique", "'
            addReportsQuery += row['pat_id'].values[0]+'", '+str(row['proc_ord_age'].values[0])
            addReportsQuery += ', '+str(row['proc_ord_year'].values[0])+', "'+str(row['proc_ord_desc'].values[0].replace("'", "\'"))
            addReportsQuery += '", "arcus.procedure_order", "'+project_id+'", "0000-00-00"), '
        print(len(toAddValidation[:numberToAdd]))
        addReportsQuery = addReportsQuery[:-2]+";"
        addingReports = client.query(addReportsQuery)
        addingReports.result()

    # New reports
    toAddNew = [procId for procId in projectProcIds if procId not in dfGradeTable['proc_ord_id'].values][:(numberToAdd - len(toAddValidation))]
    
    # Add new reports
    print("Number of new reports to grade:", len(toAddNew))
    if len(toAddNew) > 0:
        addReportsQuery = 'insert into lab.grader_table_with_metadata (proc_ord_id, grader_name, grade, grade_category, pat_id, age_in_days, proc_ord_year, proc_name, report_origin_table, project, grade_date) VALUES '
        for procId in toAddNew:
            row = dfProject[dfProject['proc_ord_id'] == procId]
            addReportsQuery += '("'+str(procId)+'", "'+name+'", 999, "Unique", "'
            addReportsQuery += row['pat_id'].values[0]+'", '+str(row['proc_ord_age'].values[0])
            addReportsQuery += ', '+str(row['proc_ord_year'].values[0])+', "'+str(row['proc_ord_desc'].values[0].replace("'", "\'"))
            addReportsQuery += '", "arcus.procedure_order", "'+project_id+'", "0000-00-00"), '
        addReportsQuery = addReportsQuery[:-2]+";"
        addingReports = client.query(addReportsQuery)
        addingReports.result()
    
    # Check: how many reports were added for the user?
    if (len(toAddValidation) + len(toAddNew)) == 0:
        print("There are no reports returned by the specified query that have yet to be either graded or validated.")
    else:
        getUserUnratedCount = 'SELECT * FROM lab.grader_table_with_metadata WHERE grader_name like "' + name + '" and grade = 999'

        df = client.query(getUserUnratedCount).to_dataframe()

        # Inform the user
        print(len(df), "reports were added for grader", name)

getMoreReportsToGrade("Jenna Schabdach", project_id="SLIP Adolescents", numberToAdd=10)

It is expected for this function to take several minutes to run. Your patience is appreciated.
Number of ids for project SLIP Adolescents 2504
Number of reports that need to be validated: 0
Number of validation reports added: 0
10
Number of new reports to grade: 10
10 reports were added for grader Jenna Schabdach


In [4]:
import json

dict_config = { "SLIP Adolescents": {
                    "query": "./queries/start2022_ages12-20years.txt",
                    "dx_filter": "/home/youngjm/bgdlab/code/filter-scans-by-dx/dx-filters/phecodes_with_exclusion_TS_and_AAB_19April2024.csv"
                },
                "SLIP": {
                    "query": "./queries/slip_base.txt",
                    "dx_filter": "/home/youngjm/bgdlab/code/filter-scans-by-dx/dx-filters/phecodes_with_exclusion_TS_and_AAB_19April2024.csv"
                },
                "SLIP PreK": {
                    "query": "./queries/start2018_ages2-5years.txt",
                    "dx_filter": ""
                },
                "Clinical Imaging Genetics": {
                    "query": "./queries/clinical_imaging_genetics.txt",
                    "dx_filter": "" 
                }
              }

with open("./queries/config.json", 'w') as fp:
    json.dump(dict_config, fp, indent=4)


# Testing

In [28]:
## Test - does that filter part of the query plus other stuff give 
# the same number of patients in the SLIP requests that can be recruited 
# for BBG?
# It should just be "get everything from table not in dx filter"
table = "lab.session_request_2023_09_with_metadata"
q = q_dx_filter + " select distinct req.pat_id from "+table+" req left join exclude_table "
q += " on req.pat_id = exclude_table.pat_id "
q += " where"
q += " exclude_table.pat_id is null "
# Adding in age/scan year filter
q += " and proc_ord_year = 2023 "
q += " and proc_ord_age > 12*365 "
q += " and proc_ord_age < 21*365 "
    
# print(q)
# Do the query
df_test = client.query(q).to_dataframe()
print(df_test.shape)

(226, 1)


In [19]:
df_nov_req = client.query("select * from lab.2023_09_slip_adolescent_prospective_filtereddx_2024_04_updated ;").to_dataframe()
print(df_nov_req.shape)

(226, 17)
