In [1]:
import pandas as pd
from pathlib import Path


# wrape this in try/except to make suing the ReportCollector portable
# probably an abstract base class would be better
try:
    import streamlit as st
    print("Streamlit imported successfully")

except ImportError:
    class DummyStreamlit:
        @staticmethod
        def markdown(self,msg):
            pass
        def error(self,msg):
            pass
        def header(self,msg):
            pass        
        def subheader(self,msg):
            pass    
        def divider(self):
            pass
    st = DummyStreamlit()
    print("Streamlit NOT successfully")




In [2]:
# GOOGLE_SHEET_ID = "1xjxLftAyD0B8mPuOKUp5cKMKjkcsrp_zr9yuVULBLG8"
GOOGLE_SHEET_ID = "1c0z5KvRELdT2AtQAH2Dus8kwAyyLrR0CROhKOjpU4Vc"


team = "Scherzer"
HOME = Path.home()
base_path = HOME / f"Projects/ASAP/asap-cloud-data-processing-resources/asap-ids/teams/"
test_path = base_path / f"{team.lower()}/input"

NULL = "NA"


In [3]:


def read_CDE(metadata_version:str="v3.0-beta", local=False):
    """
    Load CDE from local csv and cache it, return a dataframe and dictionary of dtypes
    """
    # Construct the path to CSD.csv

    if metadata_version == "v1":
        sheet_name = "ASAP_CDE_v1"
    elif metadata_version == "v2":
        sheet_name = "ASAP_CDE_v2"
    elif metadata_version == "v2.1":
        sheet_name = "ASAP_CDE_v2.1"
    elif metadata_version in ["v3.0","v3.0-beta"]:
        sheet_name = "ASAP_CDE_v3.0-beta"
    else:
        sheet_name = "ASAP_CDE_v2.1"


    if metadata_version in ["v1","v2","v2.1","v3.0-beta"]:
        print(f"metadata_version: {sheet_name}")
    else:
        print(f"Unsupported metadata_version: {sheet_name}")
        return 0,0
    
    cde_url = f"https://docs.google.com/spreadsheets/d/{GOOGLE_SHEET_ID}/gviz/tq?tqx=out:csv&sheet={sheet_name}"
    if local:
        cde_url = f"{sheet_name}.csv"
    
    try:
        CDE_df = pd.read_csv(cde_url)
        read_source = "url" if not local else "local file"
        print(f"read {read_source}")
    except:
        CDE_df = pd.read_csv(f"{sheet_name}.csv")
        print("read local file")

    return CDE_df



In [4]:
CDE_df = read_CDE(metadata_version="v3.0-beta", local=True)

metadata_version: ASAP_CDE_v3.0-beta
read local file


In [5]:
CDE_df.columns

Index(['Table', 'Field', 'Description', 'DataType', 'Required', 'Validation',
       'V0', 'comment', 'denormalized', 'dataset relavent'],
      dtype='object')

In [6]:
CDE_df["Table"].unique()

array(['STUDY', 'PROTOCOL', 'SUBJECT', 'CLINPATH', 'SAMPLE',
       'ASSAY_scRNAseq', 'DATA', 'PMDBS'], dtype=object)

In [7]:
def read_meta_table(table_path):
    # read the whole table
    try:
        table_df = pd.read_csv(table_path,dtype=str)
    except UnicodeDecodeError:
        table_df = pd.read_csv(table_path, encoding='latin1',dtype=str)

    # drop the first column if it is just the index
    if table_df.columns[0] == "Unnamed: 0":
        table_df = table_df.drop(columns=["Unnamed: 0"])

    
    return table_df


In [8]:


tables = {}
for table in CDE_df["Table"].unique():
    # load table
    table_path = test_path / f"{table}.csv"
    if not table_path.exists():
        print(f"Table {table} does not exist")
    else:
        df = pd.read_csv(table_path,dtype="str")
        tables[table] = df
        print(f"Loading {table} table")




Loading STUDY table
Loading PROTOCOL table
Loading SUBJECT table
Loading CLINPATH table
Loading SAMPLE table
Table ASSAY_scRNAseq does not exist
Loading DATA table
Table PMDBS does not exist


In [None]:
tables = {}
for table in CDE_df["Table"].unique():
    # load table
    table_path = test_path / f"{table}.csv"
    df = pd.read_csv(table_path,dtype="str")
    df.replace({"":NULL, pd.NA:NULL}, inplace=True)
    tables[table] = df
    specific_cde_df = CDE_df[CDE_df['Table'] == table]


    print(f"___________{table}__________")
    for field in specific_cde_df["Field"]:
        entry_idx = specific_cde_df["Field"]==field

        opt_req = "REQUIRED" if specific_cde_df.loc[entry_idx, "Required"].item()=="Required" else "OPTIONAL"

        if field not in df.columns:
            print(f"missing {opt_req} column {field}")

        else:
            datatype = specific_cde_df.loc[entry_idx,"DataType"]
            if datatype.item() == "Integer":
                # recode "Unknown" as NULL
                df.replace({"Unknown":NULL, "unknown":NULL}, inplace=True)
                df[field].apply(lambda x: int(x) if x!=NULL else x )
                # test that all are integer or NULL, flag NULL entries
            elif datatype.item() == "Float":
                # recode "Unknown" as NULL
                df.replace({"Unknown":NULL, "unknown":NULL}, inplace=True)
                df[field].apply(lambda x: float(x) if x!=NULL else x )
                # test that all are float or NULL, flag NULL entries
            elif datatype.item() == "Enum":

                valid_values = eval(specific_cde_df.loc[entry_idx,"Validation"].item())
                entries = df[field]
                valid_entries = entries.apply(lambda x: x in valid_values)
                invalid_values = entries[~valid_entries].unique()
                n_invalid = invalid_values.shape[0]
                if n_invalid > 0:
                    print(f">> {field} has {n_invalid} invalid entries. ")
                    valstr = ", ".join(valid_values)
                    print(f"              recode from {valstr}")
            else: #dtype == String
                pass
            
            n_null = (df[field]==NULL).sum()
            if n_null > 0:            
                print(f"{opt_req} {field} has {n_null}/{df.shape[0]} NULL entries ")

    # validate_table(df, specific_cde_df)

In [9]:
table = "CLINPATH"

# load table
table_path = test_path / f"{table}.csv"
df = pd.read_csv(table_path,dtype="str")
df.replace({"":NULL, pd.NA:NULL}, inplace=True)
tables[table] = df
specific_cde_df = CDE_df[CDE_df['Table'] == table]



In [11]:


def get_log(log_file):
    """ grab logged information from the log file."""
    with open(log_file, 'r') as f:
        report_content = f.read()
    return report_content

def columnize( itemlist ):
    NEWLINE_DASH = ' \n- '
    if len(itemlist) > 1:
        return f"- {itemlist[0]}{NEWLINE_DASH.join(itemlist[1:])}"
    else:
        return f"- {itemlist[0]}"





class ReportCollector:
    """
    Class to collect and log messages, errors, and markdown to a log file and/or streamlit
    """

    def __init__(self, destination="both"):
        self.entries = []
        self.filename = None

        if destination in ["both", "streamlit"]:
            self.publish_to_streamlit = True
        else:
            self.publish_to_streamlit = False


    def add_markdown(self, msg):
        self.entries.append(("markdown", msg))
        if self.publish_to_streamlit:
            st.markdown(msg)


    def add_error(self, msg):
        self.entries.append(("error", msg))
        if self.publish_to_streamlit:
            st.error(msg)

    def add_header(self, msg):
        self.entries.append(("header", msg))
        if self.publish_to_streamlit:    
            st.header(msg)

    def add_subheader(self, msg):
        self.entries.append(("subheader", msg))
        if self.publish_to_streamlit:    
            st.subheader(msg)

    def add_divider(self):
        self.entries.append(("divider", None))
        if self.publish_to_streamlit:    
            st.divider()

    
    def write_to_file(self, filename):
        self.filename = filename
        with open(filename, 'w') as f:
            report_content = self.get_log()
            f.write(report_content)
    

    def get_log(self):
        """ grab logged information from the log file."""
        report_content = []
        for msg_type, msg in self.entries:
            if msg_type == "markdown":
                report_content += msg + '\n'
            elif msg_type == "error":
                report_content += f"🚨⚠️❗ **{msg}**\n"
            elif msg_type == "header":
                report_content += f"# {msg}\n"
            elif msg_type == "subheader":
                report_content += f"## {msg}\n"
            elif msg_type == "divider":
                report_content += 60*'-' + '\n'
        
        return "".join(report_content)

    def reset(self):
        self.entries = []
        self.filename = None

    def print_log(self):
        print(self.get_log())



In [22]:

def validate_table(df: pd.DataFrame, table_name: str, specific_cde_df: pd.DataFrame, out: ReportCollector ):
    """
    Validate the table against the specific table entries from the CDE
    """
    def my_str(x):
        return f"'{str(x)}'"
        
    missing_required = []
    missing_optional = []
    null_fields = []
    invalid_entries = []
    total_rows = df.shape[0]
    for field in specific_cde_df["Field"]:
        entry_idx = specific_cde_df["Field"]==field

        opt_req = "REQUIRED" if specific_cde_df.loc[entry_idx, "Required"].item()=="Required" else "OPTIONAL"

        if field not in df.columns:
            if opt_req == "REQUIRED":
                missing_required.append(field)
            else:
                missing_optional.append(field)

            # print(f"missing {opt_req} column {field}")

        else:
            datatype = specific_cde_df.loc[entry_idx,"DataType"]
            if datatype.item() == "Integer":
                # recode "Unknown" as NULL
                df.replace({"Unknown":NULL, "unknown":NULL}, inplace=True)
                df[field].apply(lambda x: int(x) if x!=NULL else x )
                # test that all are integer or NULL, flag NULL entries
            elif datatype.item() == "Float":
                # recode "Unknown" as NULL
                df.replace({"Unknown":NULL, "unknown":NULL}, inplace=True)
                df[field].apply(lambda x: float(x) if x!=NULL else x )
                # test that all are float or NULL, flag NULL entries
            elif datatype.item() == "Enum":

                valid_values = eval(specific_cde_df.loc[entry_idx,"Validation"].item())
                entries = df[field]
                valid_entries = entries.apply(lambda x: x in valid_values)
                invalid_values = entries[~valid_entries].unique()
                n_invalid = invalid_values.shape[0]
                if n_invalid > 0:
                    valstr = ', '.join(map(my_str, valid_values))
                    invalstr = ', '.join(map(my_str,invalid_values))
            else: #dtype == String
                pass
            
            n_null = (df[field]==NULL).sum()
            if n_null > 0:            
                null_fields.append((opt_req, field, n_null))


    # now compose report...
    if len(missing_required) > 0:
        out.add_error(f"Missing Required Fields in {table_name}: {', '.join(missing_required)}")
    else:
        out.add_markdown(f"All required fields are present in *{table_name}* table.")

    if len(missing_optional) > 0:
        out.add_error(f"Missing Optional Fields in {table_name}: {', '.join(missing_optional)}")
    

    if len(null_fields) > 0:
        # print(f"{opt_req} {field} has {n_null}/{df.shape[0]} NULL entries ")
        out.add_error(f"{len(null_fields)} Fields with empty (NULL) values:")
        for opt_req, field, count in null_fields:
            out.add_markdown(f"\n\t- {field}: {count}/{total_rows} empty rows ({opt_req})")
    else:
        out.add_markdown(f"No empty entries (NULL) found .")


    if len(invalid_entries) > 0:
        out.add_error(f"{len(invalid_entries)} Fields with invalid entries:")
        for opt_req, field, count, valstr, invalstr in invalid_entries:
            str_out = f"- _*{field}*_:  invalid values 💩{invalstr}\n"
            str_out += f"    - valid ➡️ {valstr}"
            out.add_markdown(str_out)
    else:
        out.add_markdown(f"No invalid entries found in Enum fields.")


    return df, out


In [23]:
report = ReportCollector("log")

df_out, report = validate_table(df,table, specific_cde_df, report)



In [24]:
report.print_log()

All required fields are present in *CLINPATH* table.
🚨⚠️❗ **Missing Optional Fields in CLINPATH: ASAP_dataset_id, ASAP_team_id, ASAP_subject_id**
🚨⚠️❗ **29 Fields with empty (NULL) values:**

	- path_autopsy_dx_main: 1/94 empty rows (OPTIONAL)

	- path_autopsy_second_dx: 3/94 empty rows (OPTIONAL)

	- path_autopsy_third_dx: 11/94 empty rows (OPTIONAL)

	- path_autopsy_fourth_dx: 36/94 empty rows (OPTIONAL)

	- path_autopsy_fifth_dx: 59/94 empty rows (OPTIONAL)

	- path_autopsy_sixth_dx: 69/94 empty rows (OPTIONAL)

	- path_autopsy_seventh_dx: 81/94 empty rows (OPTIONAL)

	- path_autopsy_eight_dx: 91/94 empty rows (OPTIONAL)

	- path_year_death: 2/94 empty rows (OPTIONAL)

	- other_cause_death_1: 94/94 empty rows (OPTIONAL)

	- other_cause_death_2: 94/94 empty rows (OPTIONAL)

	- path_braak_asyn: 94/94 empty rows (OPTIONAL)

	- path_cerad: 44/94 empty rows (OPTIONAL)

	- path_thal: 21/94 empty rows (OPTIONAL)

	- known_pathogenic_mutation: 68/94 empty rows (OPTIONAL)

	- PD_pathogenic_m

In [26]:


tables = {}
for table in CDE_df["Table"].unique():
    # load table
    table_path = test_path / f"{table}.csv"

    if not table_path.exists():
        print(f"Table {table} does not exist")
    else:
        df = pd.read_csv(table_path,dtype="str")
        df.replace({"":NULL, pd.NA:NULL}, inplace=True)

        tables[table] = df
        print(f"Loading {table} table")


        specific_cde_df = CDE_df[CDE_df['Table'] == table]
        report = ReportCollector("log")

        df_out, report = validate_table(df,table, specific_cde_df, report)
        report.print_log()
      




Loading STUDY table
🚨⚠️❗ **Missing Required Fields in STUDY: submitter_email, numbe_samples, sample_types**
🚨⚠️❗ **Missing Optional Fields in STUDY: ASAP_dataset_id, ASAP_team_id, PI_ORCID, alternate_dataset_id**
🚨⚠️❗ **5 Fields with empty (NULL) values:**

	- other_funding_source: 1/1 empty rows (REQUIRED)

	- publication_DOI: 1/1 empty rows (REQUIRED)

	- publication_PMID: 1/1 empty rows (REQUIRED)

	- PI_google_scholar_id: 1/1 empty rows (OPTIONAL)

	- preprocessing_references: 1/1 empty rows (OPTIONAL)
No invalid entries found in Enum fields.

Loading PROTOCOL table
All required fields are present in *PROTOCOL* table.
🚨⚠️❗ **Missing Optional Fields in PROTOCOL: ASAP_dataset_id, ASAP_team_id**
No empty entries (NULL) found .
No invalid entries found in Enum fields.

Loading SUBJECT table
All required fields are present in *SUBJECT* table.
🚨⚠️❗ **Missing Optional Fields in SUBJECT: ASAP_dataset_id, ASAP_team_id, ASAP_subject_id**
🚨⚠️❗ **15 Fields with empty (NULL) values:**

	- prima

In [28]:
tables.keys()

dict_keys(['STUDY', 'PROTOCOL', 'SUBJECT', 'CLINPATH', 'SAMPLE', 'DATA'])

In [29]:
tables["CLINPATH"]

Unnamed: 0.1,Unnamed: 0,subject_id,source_subject_id,duration_pmi,path_autopsy_dx_main,path_autopsy_second_dx,path_autopsy_third_dx,path_autopsy_fourth_dx,path_autopsy_fifth_dx,path_autopsy_sixth_dx,...,path_nia_ri,path_nia_aa_a,path_nia_aa_b,path_nia_aa_c,TDP43,arteriolosclerosis_severity_scale,amyloid_angiopathy_severity_scale,path_ad_level,dig_slide_avail,quant_path_avail
0,0,BN0009,00-09,4.0,PD/Dem,Charcot-Marie-Tooth disease (history),"GBA L444P/WT, L444P mutation",,,,...,,,,,,,,,,
1,1,BN0329,03-29,4.5,PD/Dem,Seizure disorder (history),,,,,...,,,,,,,,,,
2,2,BN0339,03-39,2.75,Control,Non-diagnostic Alzheimer's changes,CAA,,,,...,,,,,,Mild,Moderate,,,
3,3,BN0341,03-41,2.5,Control,Non-diagnostic Alzheimer's changes,CWMR,,,,...,,,,,No,Moderate,Mild,,,
4,4,BN0347,03-47,3.5,Control (MCI),Non-diagnostic Alzheimer's changes,"Argyrophilic grains, mesial temporal lobe",Infarct(s),CWMR,Several microscopic foci of cerebellar cortica...,...,,,,,,Mild,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
89,89,BN2003,20-03,2.65,PD/Dem,"Microscopic changes of Alzheimer's disease, in...","Focal non-specific glial tauopathy, cortex of ...",,,,...,,,,,No,,,,,
90,90,BN2015,20-15,5.4,Control (history),"Microscopic changes of Alzheimer's disease, in...",Incidental Lewy body disease,,,,...,,,,,No,,,,,
91,91,BN9944,99-44,2.16,Control,Non-diagnostic Alzheimer's changes,Alzheimer Type II astrocytosis consistent with...,,,,...,,,,,,,,,,
92,92,BN9947,99-47,2.5,Control,Non-diagnostic Alzheimer's changes,Alzheimer type II astrocytosis,Inc LBs,,,...,,,,,,,,,,
