## Generate default property tables based on schemas in the DDE

These scripts were originally written to generate default property tables for the nde schema in the DDE

It has been used to successfully export property tables for the nde:Dataset and nde:ComputationalTool schemas

#### To do: 
Test the flexibility of the script by generating tables for:
* outbreak:ComputationalTool (successful run)
* outbreak:Dataset
* niaid:Dataset
* bioschemas:Protein

In [1]:
import os
import json
import pandas as pd
import requests
from datetime import datetime
from collections import OrderedDict

In [2]:
nb_path = os.getcwd()
parent_path = os.path.dirname(nb_path)
metainfo = os.path.join(parent_path,'metainfo')
print(parent_path)

C:\Users\gtsueng\Anaconda3\envs\outbreak\DDE-CrossWalks


In [3]:
## Parse out basic information and clean up the resulting table
def parse_marginality(validation_rules):
    try:
        required = validation_rules["required"]
        requiredDF = pd.DataFrame([{"property":x, "marginality":"minimal or required"} for x in required])
    except:
        requiredDF = []
    try:
        recommended = validation_rules["recommended"]
        recommendedDF = pd.DataFrame([{"property":x, "marginality":"recommended"} for x in recommended])
    except:
        recommendedDF = []
    try:
        optional = validation_rules["optional"]
        optionalDF = pd.DataFrame([{"property":x, "marginality":"optional"} for x in optional])
    except:
        optionalDF = []
    marginality_rules = pd.concat((requiredDF,recommendedDF,optionalDF),ignore_index=True)
    return marginality_rules

def parse_cardinality(validation_rules):
    property_dict = validation_rules["properties"]
    property_list = list(property_dict.keys())
    try:
        cardinality_list = [{"property":x,"owl:cardinality":property_dict[x]["owl:cardinality"]} for x in property_list]
    except:
        cardinality_list = []
        for x in property_list:
            if "owl:cardinality" in list(property_dict[x].keys()):
                cardinality_list.append({"property":x,"owl:cardinality":property_dict[x]["owl:cardinality"]})
            else:
                cardinality_list.append({"property":x,"owl:cardinality":"Unspecified"})
    cardinality_df = pd.DataFrame(cardinality_list)
    return cardinality_df


In [4]:
## Get property definitions
## properties not defined in NDE should be checked in parent classes of NDE
def get_schema(namespace,sdo=False):
    if sdo == True:
        schema_url = "https://schema.org/version/latest/schemaorg-current-https.jsonld"
    else:
        schema_base_url = "https://discovery.biothings.io/api/registry/"
        schema_url = f"{schema_base_url}{namespace}/"
    r = requests.get(schema_url)
    result_schema = json.loads(r.text)    
    return result_schema

def get_defined_props(schema_results,sdo=False):
    if sdo == True:
        schema_hits = schema_results['@graph']
    else:
        schema_hits = schema_results['source']['@graph']
    schema_props = []
    for eachhit in schema_hits:
        if eachhit["@type"] == "rdf:Property":
            schema_props.append(eachhit)        
    return schema_props

def convert_prop_df(schema_props,namespace):
    tmpdf = pd.DataFrame(schema_props)
    tmpdf.rename(columns={'rdfs:label':'property'},inplace=True)
    tmpdf['domainIncludes'] = tmpdf.apply(lambda row: clean_ids(row['schema:domainIncludes']),axis=1)
    tmpdf['rangeIncludes'] = tmpdf.apply(lambda row: clean_ids(row['schema:rangeIncludes']),axis=1)
    cleandf = tmpdf[['property','domainIncludes',
                     'rangeIncludes']].copy()
    if namespace == "schema":
        schema_base_url = "https://schema.org/"
        cleandf["url"] = [f"{schema_base_url}{x}" for x in cleandf['property']]
    else:
        schema_base_url = "https://discovery.biothings.io/view/"
        cleandf["url"] = [f"{schema_base_url}{namespace}/{x}" for x in cleandf['property']]
    return cleandf


def clean_ids(idlist):
    if isinstance(idlist,float)==True:
        cleanids = "Unspecified"
    elif isinstance(idlist,list)==True:
        idresults = [x['@id'] for x in idlist]
        if len(idresults)<2:
            cleanids = idresults[0]
        else:
            cleanids = idresults
    elif isinstance(idlist,dict)==True:
        cleanids = idlist['@id']
    else:
        print(idlist)
    return cleanids

def generate_schema_propdf():
    sdo = True
    namespace = "schema"
    base_url = "https://schema.org"
    sdo_schema = get_schema(namespace,sdo)
    sdo_props = get_defined_props(sdo_schema,sdo)
    schema_prop_df = convert_prop_df(sdo_props,namespace)
    schema_prop_df['property'] = schema_prop_df['property'].astype(str)
    return schema_prop_df

#### Searching for properties in parent classes 
##Look into the @context to find used classes/namespaces  
def generate_namespace_list(namespace,sdo=False):
    foundations = ["rdf","rdfs","owl"]
    if sdo==True:
        result_schema = get_schema(namespace)
        context_dict = {}
    else:
        result_schema = get_schema(namespace,sdo)
        context_dict = result_schema['source']['@context']
        for eachfoundation in foundations:
            try:
                context_dict.pop(eachfoundation)
            except:
                continue
    return context_dict

In [5]:
def fetch_schema_classes(namespace,raw=False):
    result_schema = get_schema(namespace,sdo=False)
    if raw == False:
        schema_class_list = result_schema['hits']
    else:
        schema_hits = result_schema['source']['@graph']
        schema_class_list = []
        for eachhit in schema_hits:
            if eachhit["@type"] == "rdfs:Class":
                schema_class_list.append(eachhit)
    return schema_class_list


def fetch_specific_class(namespace,schematype,raw=False):
    if raw == False:
        schema_class_list = fetch_schema_classes(namespace)
    else:
        schema_class_list = fetch_schema_classes(namespace,raw)
    for eachhit in schema_class_list:
        try:
            if eachhit["rdfs:label"] == schematype:
                return eachhit
        except:
            if eachhit["label"] == schematype:
                return eachhit

In [6]:
def get_parent_class_info(namespace):
    schema_class_list = fetch_schema_classes(namespace,raw=False)
    schema_parents_dict = {}
    for eachclass in schema_class_list:
        schema_parents_dict[eachclass["name"]] = eachclass["parent_classes"]
    return schema_parents_dict

## parses a list encoded as a string
def parse_fake_list(fake_list):
    if isinstance(fake_list,list): ## the list is actually a list
        if len(fake_list) == 1: ## however, the list has only string entry which needs to be parsed
            real_list = fake_list[0].split(',')
        else:
            real_list = fake_list
    elif isinstance(fake_list,text): ## the list is a string encoded list
        real_list = fake_list.strip('[').strip(']').split(',')
    else:
        print(fake_list.type)
    return real_list

def get_parent_order(namespace,schema_class):
    schema_parents_dict = get_parent_class_info(namespace)
    real_list = parse_fake_list(schema_parents_dict[schema_class])
    temp_list = [x.split(':')[0] for x in real_list]
    temp_list = [x.strip() for x in temp_list]
    schema_order = list(OrderedDict.fromkeys(temp_list))
    ## Add the namespace (which will always be last in order)
    schema_order.append(namespace)
    return schema_order

def reorder_list(schema_order):
    new_order = []
    i = -1
    while abs(i)<=len(schema_order):
        namespace = schema_order[i]
        new_order.append(namespace)
        i=i-1 
    return new_order

## Get marginality and cardinality data for all the properties
def get_margin_cardin(namespace,schema_class):
    schematype = schema_class.split(':')[1]
    class_schema = fetch_specific_class(namespace,schematype,raw=True)
    validation_rules = class_schema["$validation"]
    marginalityDF = parse_marginality(validation_rules)
    cardinalityDF = parse_cardinality(validation_rules)
    baseDF = marginalityDF.merge(cardinalityDF,on="property",how="outer")
    return baseDF

def generate_prop_df(namespace, schema_class):
    sdo_prop_df = generate_schema_propdf()
    cleanDF = pd.DataFrame(columns = ['property','owl:cardinality','marginality',
                                      'domainIncludes','rangeIncludes','url'])
    ignore_props = []
    baseDF = get_margin_cardin(namespace,schema_class)
    schema_order = get_parent_order(namespace,schema_class)
    priority_order = reorder_list(schema_order)
    for eachnamespace in priority_order:
        if eachnamespace != 'schema':
            result_schema = get_schema(eachnamespace)
            schema_props = get_defined_props(result_schema)
            defined_props = convert_prop_df(schema_props,eachnamespace)
            lessDF = baseDF.loc[~baseDF['property'].isin(ignore_props)]
            tmpdf = lessDF.merge(defined_props,on="property",how="inner")
            ignore_props.extend(tmpdf['property'].unique().tolist())
            cleanDF = pd.concat((cleanDF,tmpdf),ignore_index=True)
        else:
            ## deal with schema.org props
            lessDF = baseDF.loc[~baseDF['property'].isin(ignore_props)]
            tmpdf = lessDF.merge(sdo_prop_df,on="property",how="inner")
            cleanDF = pd.concat((cleanDF,tmpdf),ignore_index=True)
    return cleanDF

def clean_up_schema(cleanDF, schema_class):
    cleanDF.rename(columns={"property":f"sameAs.{schema_class}",
               "marginality":"sameAs.marginality",
               "owl:cardinality":"sameAs.owl:cardinality",
               "domainIncludes":"sameAs.domainIncludes",
               "rangeIncludes":"sameAs.rangeIncludes",
               "url":"sameAs.url"},inplace=True)
    cleanclassDF = cleanDF[[f"sameAs.{schema_class}","sameAs.owl:cardinality","sameAs.marginality",
                           "sameAs.domainIncludes","sameAs.rangeIncludes","sameAs.url"]].copy()
    return cleanclassDF

In [106]:
namespace = "nde"
schema_class = 'nde:Dataset'
ndeDF = generate_prop_df(namespace, schema_class)
cleanndeDF = clean_up_schema(ndeDF, schema_class)
print(cleanndeDF.head(n=2))
cleanndeDF.to_csv(os.path.join(metainfo,'nde','nde_dataset_props.tsv'),sep='\t',header=True)


      sameAs.nde:Dataset sameAs.owl:cardinality   sameAs.marginality  \
0                   date                    one  minimal or required   
1  includedInDataCatalog                    one  minimal or required   

                  sameAs.domainIncludes sameAs.rangeIncludes  \
0  [nde:Dataset, nde:ComputationalTool]          schema:Date   
1                 nde:ComputationalTool   schema:DataCatalog   

                                          sameAs.url  
0       https://discovery.biothings.io/view/nde/date  
1  https://discovery.biothings.io/view/nde/includ...  


In [133]:
namespace = "nde"
schema_class = 'nde:ComputationalTool'
ndectDF = generate_prop_df(namespace, schema_class)
cleanndectDF = clean_up_schema(ndectDF, schema_class)
print(cleanndectDF.head(n=2))
cleanndectDF.to_csv(os.path.join(metainfo,'nde','nde_comptools_props.tsv'),sep='\t',header=True)

  sameAs.nde:ComputationalTool sameAs.owl:cardinality   sameAs.marginality  \
0                         date                    one  minimal or required   
1        includedInDataCatalog            Unspecified  minimal or required   

                  sameAs.domainIncludes sameAs.rangeIncludes  \
0  [nde:Dataset, nde:ComputationalTool]          schema:Date   
1                 nde:ComputationalTool   schema:DataCatalog   

                                          sameAs.url  
0       https://discovery.biothings.io/view/nde/date  
1  https://discovery.biothings.io/view/nde/includ...  


In [7]:
namespace = "outbreak"
schema_class = 'outbreak:ComputationalTool'
ndectDF = generate_prop_df(namespace, schema_class)
cleanndectDF = clean_up_schema(ndectDF, schema_class)
print(cleanndectDF.head(n=2))
cleanndectDF.to_csv(os.path.join(metainfo,'nde','outbreak_comptools_props.tsv'),sep='\t',header=True)

  sameAs.outbreak:ComputationalTool sameAs.owl:cardinality  \
0                            author            Unspecified   
1                           funding            Unspecified   

    sameAs.marginality                              sameAs.domainIncludes  \
0  minimal or required  [outbreak:Analysis, outbreak:Dataset, outbreak...   
1  minimal or required  [outbreak:Dataset, outbreak:Publication, outbr...   

                       sameAs.rangeIncludes  \
0  [outbreak:Person, outbreak:Organization]   
1                      schema:MonetaryGrant   

                                          sameAs.url  
0  https://discovery.biothings.io/view/outbreak/a...  
1  https://discovery.biothings.io/view/outbreak/f...  


## Deprecated functions

In [None]:
## translate general expected types
## This has yet to be completed due to the levels of nesting involved (dictionaries, to lists, to dictionaries)
## Note, it doesn't make sense to try this approach, instead we can pull this info from property definitions
def transform_prop_types(proptypelist,prop_validation):
    if prop_validation["type"] == "string":
        try:
            if prop_validation["format"] == "date":
                proptypelist.append({"schema:Date"})
            if prop_validation["format"] == "uri":
                proptypelist.append({"schema:URL"})
        except:
            proptypelist.append({"schema:Text"})
    if prop_validation["type"] == "boolean":
        proptypelist.append({"schema:Boolean"})
    if prop_validation["type"] == "integer":
        proptypelist.append({"schema:Integer"})
    if prop_validation["type"] == "number":
        proptypelist.append({"schema:Number"})    
    return proptypelist


def generate_def_dict(class_definitions):
    def_dict = {}
    for k,v in class_definitions.items():
        try:
            def_dict[k] = {f"schema:{v['@type']}"}
        except:
            def_dict[k] = "JSON schema object"
    return def_dict
    
def lookup_reference(proptypelist,reference_value,def_dict):
    reference_prop = reference_value.replace("#/definitions/","")
    proptypelist.append(def_dict[reference_prop])
    return proptypelist

def get_ref_value(prop_dict):
    if "$ref" in list(prop_dict.keys()):
        reference_value = prop_dict["$ref"]
    return reference_value