# Upload

Prepare the datasets and save them to the upload database.

# Setup

Setup config, database, and libraries.

## Libraries

In [1]:
import os
import pandas as pd
import re
import yaml

from datetime import datetime
from edotenv import load_edotenv
from pandas.api.types import is_numeric_dtype
from pathlib import Path
from sqlalchemy import create_engine, text, inspect
from textwrap import dedent

## Config

Load settings from the `config.yml` file.

In [2]:
with open('config.yml', 'r') as file:
    config = yaml.safe_load(file)
    
ver = config['Version']
abbrv = config['Abbreviation']

## Database Connection

To connect to the database, you need to ensure that you have saved your login with successfully with `bin/login.bat` or `bin/login.sh`:

In Windows, run:

```
bin\login
```

In Linux/Mac OS, run:

```
source bin/login.sh
```

If these commands run successfully, a `.env` file will be created.

In [3]:
load_edotenv('../.env')
uengine = create_engine(os.environ['UPLOAD_DB_URL'])
is_om = os.environ['UPLOAD_DB_IS_OM'] == 'True'

**Note**: If this fails, try `bin/login.bat` or `bin/login.sh` again.

## Initialize Data Descriptions

Create initial data descriptions from the `src/config.yml` file using the `Description` keys under the `Data` list. 

In [4]:
# Create the data descriptions from config.yml
ddescribe = []
for data in config['Data']:
    ddescribe.append({
        'dataset': data['Name'],
        'description': data['Description']
    })
ddescribe = pd.DataFrame(ddescribe)

# Sort by dataset name
ddescribe = ddescribe.sort_values(by=['dataset'])

Preview data descriptions.

In [5]:
ddescribe

Unnamed: 0,dataset,description
12,cghr10,Centre for Global Health Research for ICD-10 (...
6,cmea10,Central Medical Evaluation Agreement for ICD-1...
0,icd10,International Classification of Diseases Revis...
13,icd10_cghr10,Mappings to convert from ICD-10 to CGHR-10.\n\...
7,icd10_cmea10,Mappings to convert from ICD-10 to CMEA-10.\n\...
2,icd10_icd11,Mappings to convert from ICD-10 to ICD-11.\n\n...
5,icd10_wbd10,Mappings to convert from ICD-10 to WBD-10.\n\n...
9,icd10_wva2016,Mappings to convert from ICD-10 to WVA-2016.\n...
11,icd10_wva2022,Mappings to convert from ICD-10 to WVA-2022.\n...
1,icd11,International Classification of Diseases Revis...


## Initialize Data Dictionaries

Create an initial data dictionary from the `src/config.yml` file using the `Description` keys under the `Columns` sub-list in the `Data` list. 

In [6]:
# Create the data dict from config.yml
ddict = []
for data in config['Data']:
    for col in data['Columns']:
        ddict.append({
            'dataset': data['Name'],
            'column': col.get('Rename', col['Name']),
            'type': col['Type'],
            'description': col['Description']
        })
ddict= pd.DataFrame(ddict)

# Sort by dataset name
ddict = ddict.sort_values(by=['dataset'])

Preview data dictionary.

In [7]:
ddict

Unnamed: 0,dataset,column,type,description
93,cghr10,icd10_range,str,ICD-10 range for the CGHR-10 code.
92,cghr10,title,str,CGHR-10 title for the code.
91,cghr10,age,str,CGHR-10 code age group. Age groups are divided...
64,cmea10,icd10_range_short,str,Range of ICD-10 codes in the CMEA-10 block - s...
63,cmea10,icd10_range,str,Range of ICD-10 codes in the CMEA-10 block - e...
...,...,...,...,...
77,wva2016,icd10_code,str,ICD-10 codes that the WVA-2016 codes can be co...
70,wva2016,group,Int64,WVA-2016 group containing a range of codes.
86,wva2022,icd10_range,str,Range of ICD-10 codes for the WVA-2022 codes.
85,wva2022,title,str,Title for the WVA-2022 code.


# Preprocessing

Preprocess datasets, data dictionaries, and data descriptions.

## Preprocess Data

Preprocess datasets from the database by:

1. Reading raw datasets from files
2. Removing rows with all empty values
3. Removing and renaming columns, while converting column data types
4. Converting whole numbers to integers
5. Updating data description and dictionary statistics
6. Saving the data description and dictionary with updated statistics

In [8]:
# Store processed data and stats for ddict
processed = {}
dstats = []

# Initialize row and col count
ddescribe.insert(1, 'rows', pd.Series(dtype='int'))
ddescribe.insert(1, 'columns', pd.Series(dtype='int'))

# Download each dataset
for data in config['Data']:
    
     # Get data info
    dataset = data['Name']
    file = Path(data['File'])
    print(f'Processing {dataset}...')
    
    # 1. Read raw data
    ext = file.suffix.lower()
    if ext in ['.xls', '.xlsx']:
        df = pd.read_excel(file)
    elif ext == '.csv':
        df = pd.read_csv(file)
    else:
        raise ValueError(f'Extension {ext} not supported.')
        
    # 2. Remove rows with all empty values
    print('Removing na rows...')
    df = df.dropna(axis=0, how='all')
    
    # 3. Remove/rename cols and convert data type
    if 'Columns' in data:
        
        # 3a. Keep cols
        print(f'Extracting columns {dataset}...')
        keep_cols = [c['Name'] for c in data['Columns']]
        df = df[keep_cols]
        
        # 3b. Rename cols
        print(f'Renaming columns {dataset}...')
        rename_cols = {c['Name']: c.get('Rename', c['Name']) for c in data['Columns']}
        df = df.rename(columns=rename_cols)
        
        # 3c. Preprocess cols
        for c in data['Columns']:
            cname = c.get('Rename', c['Name'])
            
            # 3c1. Remove leading dashes
            if c.get('Remove Leading Dashes', False):
                print(f'Removing leading dashes ({cname})...')
                df[cname] = df[cname].str.lstrip(' -')
                    
            # 3c2. Convert data type
            if 'Type' in c:
                print(f'Converting data type ({cname})...')
                df[cname] = df[cname].astype(c['Type'])
            
    # 4. Convert to whole numbers
    for c in df.columns:
        if is_numeric_dtype(df[c]):
            print(f'Converting whole numbers ({c})...')
            is_null = df[c].isnull()
            is_int = df[c].apply(lambda x: float.is_integer(x) if isinstance(x, float) else False)
            if all(is_null | is_int):
                df[c] = df[c].astype('Int64').round(0)
            
    # 5a. Update data description stats
    print('Updating statistics...')
    ddescribe.loc[ddescribe['dataset'] == dataset, ['rows', 'columns']] = df.shape
    
    # 5b. Add ddict stats
    stats = df.describe(include='all').transpose()
    stats = stats.reset_index()
    stats = stats.rename(columns={'index': 'column'})
    stats.insert(0, 'dataset', dataset)
    dstats.append(stats)
    
    # Add to processed datasets
    processed[dataset] = df
    
# 7a. Organize data descriptions
ddescribe[['rows', 'columns']] = ddescribe[['rows', 'columns']].astype(int)
ddescribe = ddescribe.sort_values(by=['dataset'])

# 7b1. Prepare stats for ddict to be merged
dstats = pd.concat(dstats).reset_index(drop=True) # combine to df per round
to_drop = [c for c in dstats.columns if c not in ['dataset', 'column']] # get prev stats cols to drop if exists
ddict = ddict.drop(to_drop, axis=1, errors='ignore') # drop prev stats cols if exists

# 7b2. Merge stats to ddict
ddict = pd.merge(ddict, dstats, on=['dataset', 'column'], how='left') # join stats to unique dataset + col names
ddict['count'] = ddict['count'].astype(int)
ddict = ddict.reset_index(drop=True).sort_values(by=['dataset'])
print('Complete!')

Processing icd10...
Removing na rows...
Extracting columns icd10...
Renaming columns icd10...
Converting data type (kind)...
Converting data type (kind_depth)...
Converting data type (chapter)...
Converting data type (code)...
Converting data type (title)...
Converting whole numbers (kind_depth)...
Updating statistics...
Processing icd11...
Removing na rows...
Extracting columns icd11...
Renaming columns icd11...
Converting data type (kind)...
Converting data type (kind_depth)...
Converting data type (chapter)...
Converting data type (code)...
Removing leading dashes (title)...
Converting data type (title)...
Converting data type (block_id)...
Converting data type (is_residual)...
Converting data type (is_leaf)...
Converting data type (is_primary_tabulation)...
Converting data type (group1)...
Converting data type (group2)...
Converting data type (group3)...
Converting data type (group4)...
Converting data type (group5)...
Converting data type (browser_url)...
Converting data type (fou

## Save Data Descriptions

Save the data descriptions.

In [9]:
# Create folder to store ddict
Path('data').mkdir(exist_ok=True)

# Save ddescribe as csv
ddescribe.to_csv(f'data/{abbrv}_data.csv', index=False)

Preview data descriptions with updated statistics.

In [10]:
ddescribe

Unnamed: 0,dataset,columns,rows,description
12,cghr10,3,36,Centre for Global Health Research for ICD-10 (...
6,cmea10,3,143,Central Medical Evaluation Agreement for ICD-1...
0,icd10,5,12597,International Classification of Diseases Revis...
13,icd10_cghr10,4,6411,Mappings to convert from ICD-10 to CGHR-10.\n\...
7,icd10_cmea10,3,1653,Mappings to convert from ICD-10 to CMEA-10.\n\...
2,icd10_icd11,12,12597,Mappings to convert from ICD-10 to ICD-11.\n\n...
5,icd10_wbd10,11,6502,Mappings to convert from ICD-10 to WBD-10.\n\n...
9,icd10_wva2016,6,2384,Mappings to convert from ICD-10 to WVA-2016.\n...
11,icd10_wva2022,4,4442,Mappings to convert from ICD-10 to WVA-2022.\n...
1,icd11,17,35459,International Classification of Diseases Revis...


## Save Data Dictionary

Save the the data dictionary.

In [11]:
# Create folder to store ddict
Path('data').mkdir(exist_ok=True)

# Save ddict as csv
ddict.to_csv(f'data/{abbrv}_ddict.csv', index=False)

Preview data dictionary with updated statistics.

In [12]:
ddict

Unnamed: 0,dataset,column,type,description,count,unique,top,freq,mean,std,min,25%,50%,75%,max
0,cghr10,icd10_range,str,ICD-10 range for the CGHR-10 code.,36,33,A00-A09,2,,,,,,,
1,cghr10,title,str,CGHR-10 title for the code.,36,29,Ill-defined,3,,,,,,,
2,cghr10,age,str,CGHR-10 code age group. Age groups are divided...,36,3,adult,19,,,,,,,
3,cmea10,icd10_range_short,str,Range of ICD-10 codes in the CMEA-10 block - s...,143,143,"A15-19,B90,J65",1,,,,,,,
4,cmea10,icd10_range,str,Range of ICD-10 codes in the CMEA-10 block - e...,143,143,"A15,A16,A17,A18,A19,B90,J65",1,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
85,wva2016,kind,str,WVA-2016 entity kind. One of group or code.,85,2,code,71,,,,,,,
89,wva2016,group_orig,str,Original WVA-2016 group with leading text VAs-.,85,15,VAs-01,17,,,,,,,
96,wva2022,title,str,Title for the WVA-2022 code.,65,65,Sepsis,1,,,,,,,
95,wva2022,icd10_range,str,Range of ICD-10 codes for the WVA-2022 codes.,65,63,"V11,V21,V31,V41,V51,V61,V90-99,V104-109,V114-1...",2,,,,,,,


# Database Upload

Upload data and comments to database, while creating views of the most recent data.

## Upload Datasets

Upload datasets to created tables in the PostgreSQL database.

In [13]:
for data in config['Data']:

    # Get info from config for dataset
    version = data.get('Version', ver)
    dataset = data['Name']
    table = f'{dataset}_v{version}'
    schema = data['Upload Schema'] if 'Upload Schema' in data else None

    # Upload to db
    print(f'Uploading {table}...')
    if not inspect(uengine).has_table(table):

        # Upload to db depending on whether it has geodata
        df = processed[dataset]
        if 'Geometry Column' in data:
            df.to_postgis(table, uengine, schema=schema, index=False)
        else:
            df.to_sql(table, uengine, schema=schema, index=False)
        print(f'Uploaded {table}!')

    else:

        # Skip upload if table exists
        print(f'Table {table} exists - skipping!')

Uploading icd10_v1...
Uploaded icd10_v1!
Uploading icd11_v1...
Uploaded icd11_v1!
Uploading icd10_icd11_v1...
Uploaded icd10_icd11_v1!
Uploading icd11_icd10_v1...
Uploaded icd11_icd10_v1!
Uploading wbd10_v1...
Uploaded wbd10_v1!
Uploading icd10_wbd10_v1...
Uploaded icd10_wbd10_v1!
Uploading cmea10_v1...
Uploaded cmea10_v1!
Uploading icd10_cmea10_v1...
Uploaded icd10_cmea10_v1!
Uploading wva2016_v1...
Uploaded wva2016_v1!
Uploading icd10_wva2016_v1...
Uploaded icd10_wva2016_v1!
Uploading wva2022_v1...
Uploaded wva2022_v1!
Uploading icd10_wva2022_v1...
Uploaded icd10_wva2022_v1!
Uploading cghr10_v1...
Uploaded cghr10_v1!
Uploading icd10_cghr10_v1...
Uploaded icd10_cghr10_v1!


## Upload Comments

Add table and column comments to uploaded dataset tables by:

1. Generating SQL for dataset table comment
2. Generating SQL for dataset column comments
3. Executing generated SQL statements above

In [14]:
sql = {}
for data in config['Data']:

    # Get info from config for dataset
    version = data.get('Version', ver)
    dataset = data['Name']
    table = f'{dataset}_v{version}'

    # Add schema if avail
    if 'Upload Schema' in data:
        schema = data['Upload Schema']
        table = f'{schema}.{table}'

    # Get ddict for dataset
    dd = ddict[ddict['dataset'] == dataset]

    # 1. Create sql for table comment
    squote = "'"
    dbquotes = "''"
    description = ddescribe[ddescribe['dataset'] == dataset]['description']
    description = description.tolist()[0].replace(squote, dbquotes)
    comment_query = f"COMMENT ON TABLE {table} IS '{description}';"

    # 2. Create sql for column comments
    ncols = dd.shape[0]
    col_query = [f"COMMENT ON COLUMN {table}.{r['column']} IS '{str(r['description']).replace(squote, dbquotes)}';" for i, r in dd.iterrows()]
    col_query = '\n'.join(col_query)

    # 3a. Add table and col comment statements
    query = f'--- {table} table comment\n' + comment_query \
        + f'\n\n--- {table} column comments (n={ncols})\n' + col_query
    sql[dataset] = query

# 3b. Combine and execute comment statements
comment_sql = '\n\n'.join(q for dataset, q in sql.items())
with uengine.connect() as connection:
    connection.execute(text(comment_sql))
    connection.execute(text('COMMIT;'))

## Upload Views

Create views for the uploaded tables with accompanying view/column comments.

In [15]:
sql = {}
for data in config['Data']:

    # Get info from config for dataset
    version = data.get('Version', ver)
    dataset = data['Name']
    table = f'{dataset}_v{version}'

    # Add schema if avail
    if 'Upload Schema' in data:
        schema = data['Upload Schema']
        table = f'{schema}.{table}'

    # Get ddict for dataset
    dd = ddict[ddict['dataset'] == dataset]

    # 1. Create view query
    view_query = f'DROP VIEW IF EXISTS {dataset}; CREATE OR REPLACE VIEW {dataset} AS (SELECT * FROM {table});'

    # 2. Create sql for view comment
    squote = "'"
    dbquotes = "''"
    description = ddescribe[ddescribe['dataset'] == dataset]['description']
    description = description.tolist()[0].replace(squote, dbquotes)
    comment_query = f"COMMENT ON VIEW {dataset} IS '{description}';"

    # 3. Create sql for column comments
    ncols = dd.shape[0]
    col_query = [f"COMMENT ON COLUMN {dataset}.{r['column']} IS '{str(r['description']).replace(squote, dbquotes)}';" for i, r in dd.iterrows()]
    col_query = '\n'.join(col_query)

    # 3a. Add view, comment, and column statements
    sql[dataset] = f'--- {dataset} view\n\n' + view_query \
        + f'\n\n--- {dataset} view comment\n\n' \
        + comment_query \
        + f'\n\n--- {dataset} view column comments (n={ncols})\n\n' \
        + col_query

# 3b. Combine and execute comment statements
view_sql = '\n\n'.join([q for dataset, q in sql.items()])
with uengine.connect() as connection:
    connection.execute(text(view_sql))
    connection.execute(text('COMMIT;'))

## Save Comments SQL

Save comments sql to file.

In [16]:
# Create folder to store database outputs
Path('database').mkdir(exist_ok=True)

# Save comments sql for tables
with open(f'database/{abbrv}_comments_v{ver}.sql', 'w') as file:
    file.write(comment_sql)

## Save Views SQL

Save views sql to file.

In [17]:
with open(f'database/{abbrv}_views_v{ver}.sql', 'w') as file:
    file.write(view_sql)

## Save Datasets

Save datasets as csv files for open access.

In [18]:
for k in processed:
    df = processed[k]
    df.to_csv(f'../data/{k}_v{version}.csv', index=False)

# Open Mortality Upload

Upload dataset and data records for https://openmortality.org

## Add Data Records

Query for the current data table or create one if it does not exist.

Then add a record for all the data in this dataset.

In [19]:
om_dtable = config['Data Table']
om_dschema = config.get('Data Schema', 'public')

with uengine.connect() as connection:
    
    # Create dataset table if not exists
    dcreate = text(dedent(
        f"""
        --- Data records table
        CREATE TABLE IF NOT EXISTS {om_dschema}.{om_dtable} (
            data_id SERIAL PRIMARY KEY,
            data_name VARCHAR UNIQUE,
            format VARCHAR,
            is_spatial BOOLEAN,
            permission VARCHAR,
            publish_date TIMESTAMP WITH TIME ZONE,
            status VARCHAR,
            tag VARCHAR,
            data_desc VARCHAR,
            last_update_date VARCHAR,
            contact VARCHAR,
            category VARCHAR
        );
        """
    ))
    connection.execute(dcreate)
    connection.execute(text('COMMIT;'))
    
    # Add record for each datum in dataset
    dinsert = []
    for data in config['Data']:
        
        # Get metadata
        dschema = data.get('Upload Schema', 'public')
        dname = data['Name'] + f'_v{ver}'
        dformat = 'csv' if 'Geometry Column' not in data else 'geojson'
        dspatial = 'Geometry Column' in data
        dpermission = data.get('Permission', 'user')
        dpublish = str(datetime.now().astimezone())
        dtag = data.get('Tag', '')
        ddesc = data.get('Description', '').replace("'", "''")
        dcat = data.get('Category', '')
        dcontact = config.get('Contact', 'support@openmortality.org')

        # Add data record insert sql
        dinsert.append(dedent(
            f"""
            --- {dname} data record
            INSERT INTO {om_dschema}.{om_dtable} (data_name, \"format\", is_spatial, \"permission\", publish_date, status, tag, data_desc, category, contact)
            SELECT '{dname}', '{dformat}', '{dspatial}', '{dpermission}', '{dpublish}', 'published', '{dtag}', '{ddesc}', '{dcat}', '{dcontact}'
            WHERE NOT EXISTS (SELECT 1 FROM {om_dschema}.{om_dtable} WHERE data_name = '{dname}');
            """
        ))
        
    # Add data record
    dinsert = ''.join(dinsert)
    connection.execute(text(dinsert))

## Add Dataset Record

Query for the current dataset table or create one if it does not exist.

Then add a record for this dataset.

In [20]:
om_dstable = config['Dataset Table']
om_dsschema = config.get('Dataset Schema', 'public')

with uengine.connect() as connection:
    
    # Create dataset table if not exists
    dscreate = text(dedent(
        f"""
        --- Dataset records table
        CREATE TABLE IF NOT EXISTS {om_dsschema}.{om_dstable} (
            dataset_id SERIAL PRIMARY KEY,
            title VARCHAR UNIQUE,
            title_abbr VARCHAR UNIQUE,
            status VARCHAR,
            data_id VARCHAR,
            publish_date TIMESTAMP WITH TIME ZONE,
            tag VARCHAR,
            dataset_desc VARCHAR,
            contact VARCHAR,
            last_update_date TIMESTAMP WITH TIME ZONE,
            permission VARCHAR
        );
        """
    ))
    connection.execute(dscreate)
    connection.execute(text('COMMIT;'))
    
    # Get data ids
    dquery = text(
        f"""
        SELECT data_id, data_name FROM {om_dschema}.{om_dtable} WHERE data_name = ANY(:names);
        """
    )
    dids = pd.read_sql(dquery, uengine, params={
        'names': [d['Name'] + f'_v{ver}' for d in config['Data']]
    })
    dids = ','.join(dids['data_id'].astype(str).tolist())
    
    # Get markdown description
    with open(f'../README.md', 'r') as file:
        md = file.read()
    ddesc = re.split(r'(\n#)', md) # split by sections
    ddesc = [ddesc[i-1].strip() + d if i > 0 and '\n#' == ddesc[i -1] else d for i, d in enumerate(ddesc)] # join back hashtags
    ddesc = [d for d in ddesc if '## About' in d or '## Citation' in d or '###' in d] # filter for certain sections
    ddesc = '\n\n'.join(ddesc)
    ddesc = ddesc.replace('## About\n\n', '') # Remove about section title
    ddesc = ddesc.replace("'", "''") # replace single quotes with double for pg
    
    # Set metadata
    dstitle = config['Title']
    dstitle_abbr_default = ''.join(filter(str.isupper, dstitle.title())).lower()
    dstitle_abbr = config.get('Abbreviation', dstitle_abbr_default)
    dscontact = config.get('Contact', 'support@openmortality.org')
    dspublish = str(datetime.now().astimezone())
    dstag = config.get('Tag', '')
    dspermission = config.get('Permission', 'user')
    
    # Add dataset record
    dsinsert = text(dedent(
        f"""
        --- {dstitle_abbr} dataset record
        INSERT INTO {om_dsschema}.{om_dstable} (title, title_abbr, status, data_id, contact, publish_date, tag, dataset_desc, \"permission\")
        SELECT '{dstitle}', '{dstitle_abbr}', 'published', '{dids}', '{dscontact}', '{dspublish}', '{dstag}', '{ddesc}', '{dspermission}'
        WHERE NOT EXISTS (SELECT 1 FROM {om_dsschema}.{om_dstable} WHERE title = '{dstitle}');
        """
    ))
    connection.execute(dsinsert)

## Grant Access to User

Grant access to OM user.

In [21]:
if is_om:
    omuser = config['Database User']
    with uengine.connect() as connection:
        
        # Create grant sql
        grantsql = ['--- Grant select on data for OM user']
        for data in config['Data']:
            dtable = data['Name'] + f'_v{ver}'
            dschema = data.get('Upload Schema', 'public')
            grantsql.append(f'GRANT SELECT ON TABLE {dschema}.{dtable} TO {omuser};')
        grantsql = text('\n'.join(grantsql))
        
        # Grant access to user for added tables
        connection.execute(grantsql)
        connection.execute(text('COMMIT;'))

## Refresh Dataset Records View

Refresh materialized view for dataset records.

In [22]:
if is_om:
    matview = config['Dataset Refresh View']
    with uengine.connect() as connection:
        refreshsql = text(f'--- Refresh dataset records materialized view \nREFRESH MATERIALIZED VIEW {matview};')
        connection.execute(refreshsql)
        connection.execute(text('COMMIT;'))

## Save Data Records SQL

Save data records sql to file.

In [23]:
with open(f'database/{abbrv}_data_v{ver}.sql', 'w') as file:
    if is_om:
        file.write(str(dcreate) + str(dinsert) + '\n' + str(grantsql))
    else:
        file.write(str(dcreate) + str(dinsert))

## Save Dataset Records SQL

Save dataset records sql to file.

In [24]:
with open(f'database/{abbrv}_dataset_v{ver}.sql', 'w') as file:
    if is_om:
        file.write(str(dscreate) + str(dsinsert) + '\n' + str(refreshsql))
    else:
        file.write(str(dscreate) + str(dsinsert) + '\n')

# Close Database

In [25]:
uengine.dispose()