## Common imports and AWS setup

In [None]:
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from dotenv import dotenv_values

env_v = dotenv_values(".env")
storage_opts = {'key': env_v["aws_access_key_id"],
                'secret': env_v["aws_secret_access_key"],
                'token': env_v["aws_session_token"]}

S3_URLS = {
    "predication" : "s3://semdb-data/parquet/predication/",
    "sentence": "s3://semdb-data/parquet/sentence/",
    "entity": "s3://semdb-data/parquet/entity/",
}

# column labels for indexing. Found in schema desc.(https://lhncbc.nlm.nih.gov/ii/tools/SemRep_SemMedDB_SKR/dbinfo.html)
# types inferred from data.
CORRECTED_TYPES = {
    "predication": {
        "PREDICATION_ID": int,  # Auto-generated primary key for each unique predication
        "SENTENCE_ID": int,     # Foreign key to the SENTENCE table
        "PMID": int,            # The PubMed identifier of the citation to which the predication belongs
        "PREDICATE": str,       # The string representation of each predicate (for example TREATS, PROCESS_OF)
        "SUBJECT_CUI": str,     # The CUI of the subject of the predication
        "SUBJECT_NAME": str,    # The preferred name of the subject of the predication
        "SUBJECT_SEMTYPE": str, # The semantic type of the subject of the predication
        "SUBJECT_NOVELTY": int, # The novelty of the subject of the predication
        "OBJECT_CUI": str,      # The CUI of the object of the predication
        "OBJECT_NAME": str,     # The preferred name of the object of the predication
        "OBJECT_SEMTYPE": str,  # The semantic type of the object of the predication
        "OBJECT_NOVELTY": int,  # The novelty of the object of the predication
    },
    "sentence": {
        "SENTENCE_ID": int,               # Auto-generated primary key for each sentence
        "PMID": int,                      # The PubMed identifier of the citation to which the sentence belongs
        "TYPE": str,                      # 'ti' for the title of the citation, 'ab' for the abstract
        "NUMBER": int,                    # The location of the sentence within the title or abstract
        "SENT_START_INDEX": int,          # The character position within the text of the MEDLINE citation of the first character of the sentence  NEW
        "SENT_END_INDEX": int,            # The character position within the text of the MEDLINE citation of the last character of the sentence  NEW
        "SENTENCE": str,                  # The actual string or text of the sentence
    },
    "entity": {
        "ENTITY_ID": int,    # Auto-generated primary key for each unique entity
        "SENTENCE_ID": int,  # The foreign key to SENTENCE table
        "CUI": str,          # The CUI of the entity
        "NAME": str,         # The preferred name of the entity
        "TYPE": str,         # The semantic type of the entity
        "TEXT": str,         # The text in the utterance that maps to the entity
        "START_INDEX": int,  # The first character position (in document) of the text denoting the entity
        "END_INDEX": int,    # The last character position (in document) of the text denoting the entity
        "SCORE": int,        # The confidence score
    }
}

# Some of these labels are not how they are described. This is due to a descrepancy between the Schema Version (v30)
# listed on the website as opposed to the current data version(v43)
CORRECTIONS = {
    "sentence": {
        "drop": [
            "SENTENCE", # Sentence is actually in the current "SENT_END_INDEX" column; label added back in correct position later.
                        # Whatever data is in this column is mostly (at least 189776747 rows) empty. No use in keeping.
            "NORMALIZED_SECTION_HEADER", # Again, mostly empty (at least 181859346 rows)
        ],
        "rename": {
            "SENT_END_INDEX" : "SENTENCE", # see above comment
            "SECTION_HEADER" : "SENT_END_INDEX", # infered from data.
        }
    },
    "entity": {
        "drop": [
            "GENE_ID", # Mostly empty
            "GENE_NAME", # Again, mostly empty
        ],
    },
}

DATA = {
    "predication": None,
    "sentence": None,
    "entity": None,
}



## Clean and Upload Data As Necessary

In [None]:
def find_empty(data: dd) -> list[str]:
    """Finds all columns with empty fields.

    Args:
        data: a dask dataframe containing data.

    Returns:
        A list of column names which contain empty fields for the given data.
    """
    # calculate number of null values for each column
    missing_c = data.isnull().sum()
    with ProgressBar():
        missing = missing_c.compute()

    # only keep columns with values > 0.
    cols = list(missing[missing > 0].index)
    return cols

def fill_empty_fields(data:dd, corrected_cols: dict[str,type], cols_to_fill: list[str] ) -> dd:
    """Fills all empty fields in dataframe.

    Args:
        data: a dask dataframe containing data.
        corrected_cols: a dictionary containing the expected type of column.
        cols_to_fill: a list of column names which contain empty fields for the given data.

    Returns:
        A new dataframe containing no empty fields.
    """

    for col in cols_to_fill:
        # fill field based on expected type.
        data = data.fillna({col: '-1' if corrected_cols[col] == int else "Not available"})

    return data

def correct_types(data:dd, corrected_cols: dict[str, type]) -> dd:
    """Changes the datatype of all columns in dataframe.

    Args:
        data: a dask dataframe without empty fields.
        corrected_cols: a dictionary containing the expected type of column.

    Returns:
        A new dataframe with the new column datatypes.
    """

    for label, dtype in corrected_cols.items():
        data[label] = data[label].astype(dtype)

    return data

def drop_relabel_data(data:dd, corrections: dict) -> dd:
    """ Drops and relabels dataframe columns as specified.

    Args:
        data: a dask dataframe containing data.
        corrections: a dictionary containing a list of the columns (by label) to drop
        and a dictionary specifying the labels to rename after the dropping operation.

    Returns:
        A new dataframe with the corrections applied.
    """

    if corrections.get("drop", False):
        data = data.drop(corrections["drop"], axis=1)

    if corrections.get("rename", False):
        data = data.rename(columns= corrections["rename"])

    return data

In [None]:
for key, url in S3_URLS.items():

    # read data into dataframe.
    print(f"{'-' *20}{key.upper()}{'-' *20}")
    print(f"Action: Reading in data.")
    try:
        DATA[key] = dd.read_parquet(url, storage_options=storage_opts)
    except Exception as e:
        print(f"Result: Failed to read in data.\n{e}")
        continue

    if CORRECTIONS.get(key, False):
        print(f"Action: Dropping/Renaming Labels.")
        try:
            DATA[key] = drop_relabel_data(DATA[key], CORRECTIONS[key])
        except Exception as e:
            print(f"Result: Failed to drop or rename labels.\n{e}")
            continue

    # get columns to fill.
    print(f"Action: Finding empty fields in data.")
    try:
        cols_to_fill = find_empty(DATA[key])
    except Exception as e:
        print(f"Result: Failed to find empty fields in data.\n{e}")
        continue

    print(f"\nInfo: Empty fields in data: \n {cols_to_fill}\n")

    # fill fields as necessary.
    print(f"Action: Filling empty fields in data.")
    try:
        DATA[key] = fill_empty_fields(DATA[key], CORRECTED_TYPES[key], cols_to_fill)
    except Exception as e:
        print(f"Result: Failed to fill empty fields in data.\n{e}")
        continue

    # convert datatypes as necessary.
    print(f"Action: Converting data to expected datatypes.")
    try:
        DATA[key] = correct_types(DATA[key], CORRECTED_TYPES[key])
    except Exception as e:
        print(f"Result: Failed to clean and convert data.\n{e}")
        raise

    print(f"Result: Success\n")


--------------------PREDICATION--------------------
Action: Reading in data.
Action: Finding empty fields in data.
[########################################] | 100% Completed | 67.30 s

Info: Empty fields in data: 
 []

Action: Filling empty fields in data.
Action: Converting data to expected datatypes.
Result: Success

--------------------SENTENCE--------------------
Action: Reading in data.
Action: Dropping/Renaming Labels.
Action: Finding empty fields in data.
[########################################] | 100% Completed | 372.34 s

Info: Empty fields in data: 
 ['SENTENCE']

Action: Filling empty fields in data.
Action: Converting data to expected datatypes.
Result: Success

--------------------ENTITY--------------------
Action: Reading in data.
Action: Dropping/Renaming Labels.
Action: Finding empty fields in data.
[########################################] | 100% Completed | 13m 14s

Info: Empty fields in data: 
 ['ENTITY_ID', 'SENTENCE_ID', 'CUI', 'NAME', 'TYPE', 'TEXT', 'START_IN

## Save Dataframe to S3 bucket as CSVs

In [None]:
s3_url = "s3://semdb-data/parquet_cleaned/"
def save_data(name: str):
    """Uploads the data as a parquet file.

    Args:
        name: the name of the dataframe.
    """
    url = s3_url+name
    print(f"{'-' *20}{name.upper()}{'-' *20}")
    print(f"Action: Saving data.")
    try:
        with ProgressBar():
            DATA[name].to_parquet(url, storage_options=storage_opts, compression='gzip')
    except Exception as e:
        print(f"Result: Failed to save data.\n{e}")
        return

    print(f"Result: Successfully saved data to {url}.\n")
    return

In [None]:
save_data("predication")

--------------------PREDICATION--------------------
Action: Saving data.
[########################################] | 100% Completed | 49m 32s
Result: Successfully saved data to s3://semdb-data/parquet_cleaned/predication.



In [None]:
save_data("sentence")

--------------------SENTENCE--------------------
Action: Saving data.
[########################################] | 100% Completed | 3hr 8ms
Result: Successfully saved data to s3://semdb-data/parquet_cleaned/sentence.



In [None]:
from pathlib import Path
import os
root = os.path.join(Path.cwd(), "csv")
ENTITY_FILE = os.path.join(root, "entity")

try:
    DATA["entity"] = dd.read_parquet(ENTITY_FILE)
except Exception as e:
    print(f"Something seriously messed up happened.\n{e}")

In [None]:
save_data("entity")

--------------------ENTITY--------------------
Action: Saving data.
[########################################] | 100% Completed | 5hr 49m
Result: Successfully saved data to s3://semdb-data/parquet_cleaned/entity.

