# Generating list of most common properties and expected types

The DDE is undergoing many improvements and getting refactored on the back-end. To ensure that the DDE can be used by as many people as possible with little/no knowledge of the JSON schema, the most common bioschemas properties and their expected types are being evaluated for inclusions as default options.

In [1]:
import os
import json
import pandas
import pathlib
import yaml
import requests

In [2]:
#script_path = pathlib.Path(__file__).parent.absolute()
tmp_dir = os.getcwd()
parent_dir = os.path.dirname(tmp_dir)
available = os.listdir(parent_dir)
outputdirectory = os.path.join(parent_dir,'specifications')
inputdirectory = os.path.join(parent_dir,'Bioschemas-Validator')
input_profiles = os.path.join(inputdirectory,'profile_json')
input_marginality = os.path.join(inputdirectory,'profile_marginality')
input_yml = os.path.join(inputdirectory,'profile_yml')

specifications = os.listdir(input_profiles)
datapath = os.path.join('results','resulting json')

In [3]:
#### Load the yaml file
def get_yml_dict(theymlfile):
    try:
        with open(theymlfile,'r',encoding="utf8") as ymlin:
            tmpyml = yaml.load_all(ymlin, Loader=yaml.FullLoader)
            for eachyml in tmpyml:
                if '<!DOCTYPE HTML>' in eachyml:
                    break
                ymldict = eachyml
    except:
        with open(theymlfile,'r',encoding="latin1") as ymlin:
            tmpyml = yaml.load_all(ymlin, Loader=yaml.FullLoader)
            for eachyml in tmpyml:
                if '<!DOCTYPE HTML>' in eachyml:
                    break
                ymldict = eachyml
    return(ymldict)

In [4]:
datapath = os.path.join('results','frequency_tables')

In [5]:
def clean_illegal_chars(theymlfile):
    with open(theymlfile,'r',encoding="utf8") as ymlin:
        theymlfile.replace("#x0080","\#x0080")
    return(theymlfile)

In [6]:
%%time

## Get frequency of expected types
allproplist = []
for eachspec in specifications:
    tmpinputprofilepath = os.path.join(input_profiles,eachspec)
    tmpinputmarginpath = os.path.join(input_marginality,eachspec)
    tmpinputyml = os.path.join(input_yml,eachspec)
    spec_profs = os.listdir(tmpinputprofilepath)
    eachversion = spec_profs[-1]
    thejsonfile = os.path.join(tmpinputprofilepath,eachversion)
    themarginfile = os.path.join(tmpinputmarginpath,eachversion)
    theymlfile = os.path.join(tmpinputyml,eachversion.replace('.json','.html'))
    try:
        ymldict = get_yml_dict(theymlfile)
    except:
        ymlin = clean_illegal_chars(theymlfile)
        tmpyml = yaml.load_all(ymlin, Loader=yaml.FullLoader)
        for eachyml in tmpyml:
            if '<!DOCTYPE HTML>' in eachyml:
                break
            ymldict = eachyml
    try:
        mappingdict = ymldict['mapping']
        for eachdict in mappingdict:
            tmpdict = {'specification':eachspec,'property':eachdict['property'],'expected_type':eachdict['expected_types'],'marginality':eachdict['marginality']}
            allproplist.append(tmpdict)
    except:
        print(eachspec)

allpropdf = pandas.DataFrame(allproplist)
print(allpropdf.head(n=2))

Sample
  specification    property  expected_type  marginality
0        Beacon  aggregator      [Boolean]  Recommended
1        Beacon     dataset  [DataCatalog]      Minimum
Wall time: 1.06 s


In [7]:
## Get frequency of combinations of expected_type
allpropdf['expected_types_raw'] = allpropdf['expected_type'].astype(str)
prop_freq = allpropdf.groupby(['property','expected_types_raw']).size().reset_index(name='counts')
combi_exp_freq = allpropdf.groupby('expected_types_raw').size().reset_index(name='counts')

prop_freq.sort_values('counts',ascending=False,inplace=True)
combi_exp_freq.sort_values('counts',ascending=False,inplace=True)

#prop_freq.to_csv(os.path.join(datapath,'property_frequency.tsv'),sep='\t',header=True)
#combi_exp_freq.to_csv(os.path.join(datapath,'raw_expected_types_frequency.tsv'),sep='\t',header=True)

In [8]:
## Get frequency of base expected type
expected_types = allpropdf.explode('expected_type')
expected_freq = expected_types.groupby('expected_type').size().reset_index(name='counts')
expected_freq.sort_values('counts',ascending=False,inplace=True)
#expected_freq.to_csv(os.path.join(datapath,'expected_types_frequency.tsv'),sep='\t',header=True)
print(expected_freq)

         expected_type  counts
74                Text     262
77                 URL     237
60       PropertyValue      71
19        CreativeWork      66
27         DefinedTerm      54
..                 ...     ...
38        HowToSection       1
1   AdministrativeArea       1
46       MedicalEntity       1
47     MolecularEntity       1
78     VirtualLocation       1

[79 rows x 2 columns]


In [9]:
def create_base_object(schematype):
    basedict = {"@type": schematype,"type": "object","properties":{},"required":[]}
    return basedict

def replace_json_validation(expected_dict):
    expected_dict["text"]={"type":"string"}
    expected_dict["url"]={"type":"string","format":"uri"}
    expected_dict["number"] = {"type":"number"}
    expected_dict["integer"] = {"type":"integer"}
    expected_dict["boolean"] = {"type":"boolean"}
    expected_dict["date"]={"type":"string","format":"date"}
    return expected_dict

def translate(typestring):
    if typestring == "Text":
        newtype = {"type":"string"}
    elif typestring == "URL":
        newtype = {"type":"string","format":"uri"}
    elif typestring == "Number":
        newtype = {"type":"number"}
    elif typestring == "Integer":
        newtype = {"type":"integer"}
    elif typestring == "Boolean":
        newtype = {"type":"boolean"}
    elif typestring == "Date":
        newtype = {"type":"string","format":"date"}
    else:
        newtype = {"type":"object","@type":typestring}
    return newtype

def generate_base_bs_rules(allpropdf):
    ## Get required properties for each class
    required_props = allpropdf.loc[allpropdf['marginality']=="Minimum"].copy()
    #required_props.to_csv(os.path.join(datapath,'required_props.tsv'),sep='\t',header=True)
    required_props['expected_count'] = required_props.apply(lambda row: count_expected_types(row['expected_type']), axis=1)
    required_props['single_expected'] = [x for x in required_props['expected_type']]
    required_props = required_props.explode('single_expected')
    bioschema_list = required_props['specification'].unique().tolist()
    validation_dict = {}
    for eachspec in bioschema_list:
        tmpdf = required_props.loc[required_props['specification']==eachspec]
        property_list = tmpdf['property'].unique().tolist()
        property_dict = {}
        for eachprop in property_list:
            propdf = tmpdf.loc[tmpdf['property']==eachprop].copy()
            if len(propdf)<2:
                ## property has a single expected type
                property_dict[eachprop] = translate(propdf.iloc[0]['single_expected'])
            if len(propdf)>1:
                propdf['translated'] = propdf.apply(lambda row: translate(row['single_expected']), axis=1)
                property_dict[eachprop] = {"oneOf": propdf['translated'].tolist()}
        validation_dict[eachspec.lower()] = {"@type":eachspec,"type":"object","properties":property_dict,"required":property_list}
    return validation_dict

def count_expected_types(expected_type):
    type_count = len(expected_type)
    return type_count



In [10]:
## convert base expected type into json dictionary for expected types
validation_dict = generate_base_bs_rules(allpropdf)

expected_objects = pandas.read_csv(os.path.join(datapath,'expected_types_frequency.tsv'),delimiter='\t',header=0,index_col=0)

expected_dict = {}
for i in range(len(expected_objects)):
    schematype = expected_objects.iloc[i]['expected_type']
    expected_dict[schematype.lower()]=create_base_object(schematype)

expected_dict = replace_json_validation(expected_dict)
expected_dict.update(validation_dict)

print(expected_dict)

#with open(os.path.join(datapath,'base_types.json'),'w') as outfile:
#    outfile.write(json.dumps(expected_dict, indent=2, sort_keys=False))

{'text': {'type': 'string'}, 'url': {'type': 'string', 'format': 'uri'}, 'propertyvalue': {'@type': 'PropertyValue', 'type': 'object', 'properties': {}, 'required': []}, 'creativework': {'@type': 'CreativeWork', 'type': 'object', 'properties': {}, 'required': []}, 'definedterm': {'@type': 'DefinedTerm', 'type': 'object', 'properties': {}, 'required': []}, 'organization': {'@type': 'Organization', 'type': 'object', 'properties': {'description': {'type': 'string'}, 'legalName': {'type': 'string'}, 'name': {'type': 'string'}, 'sameAs': {'type': 'string', 'format': 'uri'}, 'topic': {'oneOf': [{'type': 'string'}, {'type': 'string', 'format': 'uri'}]}, 'type': {'oneOf': [{'type': 'string'}, {'type': 'string', 'format': 'uri'}]}}, 'required': ['description', 'legalName', 'name', 'sameAs', 'topic', 'type']}, 'person': {'@type': 'Person', 'type': 'object', 'properties': {'description': {'type': 'string'}, 'mainEntityOfPage': {'oneOf': [{'type': 'object', '@type': 'CreativeWork'}, {'type': 'stri

In [None]:
print(expected_dict['beacon'])

### Defining rules for a validation builder

This section is just to test the logic for creating combined JSON schema validation rules
For example: how to convert one or many rules for single or multiple types of json validation rules

Example cases:
* ONE of a single rule type: ONE type: Text (string)
* Many of a single rule type: Many of type: Text (string)
* One of two potential rule types: Text (string) or Person (object)
* One of two potential rule types: Text (string) or URL (Formatted Text)
* Many of two potential rule types: Text (string) or Person (object)
* Many of two potential rules types: Text (string) or URL (formatted string)

In [None]:
def get_single_rule(expected_dict,rule_key):
    rule_dict = expected_dict[rule_key]
    return rule_dict

def get_one_many_rule(expected_dict,rule_list):
    if len(set(rule_list))==1:
        ## This is a list of a single rule, treat as such
        rule_dict = get_single_rule(expected_dict,rule_list[0])
    else:
        rule_dict = {}
        rule_val_list = []
        for each_rule in rule_list:
            rule_val_list.append(expected_dict[each_rule])
        rule_dict["oneOf"] = rule_val_list
    return rule_dict

def get_many_single_rule(expected_dict,rule_key):
    rule_dict = {}
    rule_val_list = []
    rule_val_list.append(expected_dict[rule_key])
    rule_val_list.append({"type":"array","items":expected_dict[rule_key]})
    rule_dict["oneOf"] = rule_val_list
    return rule_dict

def get_many_many_rules(expected_dict,rule_list):
    rule_dict = {}
    ### check if they are all of the same json schema types
    rule_set = set(rule_list)
    if len(rule_set) == 1 and len(rule_list) == 1:
        ## This is actually just a single rule placed in a list
        rule_dict = get_many_single_rule(expected_dict,rule_list[0])
    
    elif len(rule_set) == 1 and len(rule_list) > 1:
        ## This is actually just a multiples of a single rule in a list, treat as above
        rule_dict = get_many_single_rule(expected_dict,rule_list[0])
        
    elif len(rule_set)> 1:
        ## The options are mixed between types, use "anyOf" for the array
        rule_val_list = []
        for each_rule in rule_list:
            rule_val_list.append(expected_dict[each_rule])
            rule_val_list.append({"type":"array","items":expected_dict[each_rule]})
        rule_dict["anyOf"] = rule_val_list
        
    return rule_dict


def get_rules(expected_dict,rule_list,cardinality="one"):
    if isinstance(rule_list,str) == True:
        if cardinality.lower() == "one":
            rule_dict = get_single_rule(expected_dict,rule_list)
        if cardinality.lower() == "many":
            rule_dict = get_many_single_rule(expected_dict,rule_list)
    if isinstance(rule_list,list):
        if cardinality.lower() == "one":
            rule_dict = get_one_many_rule(expected_dict,rule_list)
        if cardinality.lower() == "many":
            rule_dict = get_many_many_rules(expected_dict,rule_list)
    return rule_dict

In [None]:
## Test the above functions
cardinality = "one"

with open(os.path.join(datapath,'base_types.json'),'r') as infile:
    expected_dict = json.load(infile, encoding="UTF-8")

In [None]:
## ONE of a single rule type: ONE type: Text (string)
cardinality = "One"
rule_dict = get_rules(expected_dict,"text",cardinality)
print(rule_dict,'\n')

## Many of a single rule type: Many of type: Text (string)
cardinality = "MANY"
rule_dict = get_rules(expected_dict,"text",cardinality)
print(rule_dict,'\n')

## One of two potential rule types: Text (string) or Person (object)
cardinality = "one"
rule_dict = get_rules(expected_dict,["text","person"],cardinality)
print(rule_dict,'\n')

## One of two potential rule types: Text (string) or URL (Formatted Text)
cardinality = "one"
rule_dict = get_rules(expected_dict,["text","url"],cardinality)
print(rule_dict,'\n')

## Many of two potential rule types: Text (string) or Person (object)
cardinality = "Many"
rule_dict = get_rules(expected_dict,["text","person"],cardinality)
print(rule_dict,'\n')

## Many of two potential rules types: Text (string) or URL (formatted string)
cardinality = "Many"
rule_dict = get_rules(expected_dict,["text","url"],cardinality)
print(rule_dict,'\n')


### generate property to rule mappings

In [None]:
## Generate mapping of properties with the same expected type
from random import sample

def clean_prop_list(stringproplist):
    type_list = stringproplist.strip('[').strip(']').split(',')
    clean_list = [x.strip(' ').replace("'","") for x in type_list]
    rule_list = [x.lower() for x in clean_list]
    return clean_list, rule_list

def generate_id_from_proplist(stringproplist,propcount):
    clean_list, rule_list = clean_prop_list(stringproplist)
    idbase = [x[0] for x in clean_list]
    letters = ['a','b','c','t','u','v','w','x','y','z']
    randbase = sample(letters, k=5)
    idhash = "".join(idbase)+'_'+str(propcount)+'_'+''.join(randbase)
    return idhash
    
def generate_prop_rule_maps(datapath,expected_dict,prop_freq):
    onemapdict = {}
    manymapdict = {}
    onerulemap = {}
    manyrulemap = {}
    onepropmap = {}
    manypropmap = {}
    oneproprule = {}
    manyproprule = {}
    prop_freq_multi = prop_freq.loc[prop_freq['counts']>1].copy() ## filter out properties that appear only once
    for each_expected_type in prop_freq_multi['expected_types_raw'].unique().tolist():
        propcount = prop_freq_multi.loc[prop_freq_multi['expected_types_raw']==each_expected_type]['counts'].sum()
        proplist = prop_freq['property'].loc[prop_freq['expected_types_raw']==each_expected_type].unique().tolist()
        clean_list, rule_list = clean_prop_list(each_expected_type)
        idhash = generate_id_from_proplist(each_expected_type,propcount)
        idhash_many = idhash+'_many'
        onemapdict[idhash]=proplist
        manymapdict[idhash_many]=proplist
        oneruledict = get_rules(expected_dict,rule_list,"one")
        onerulemap[idhash]=oneruledict
        manyruledict = get_rules(expected_dict,rule_list,"many")
        manyrulemap[idhash_many]=manyruledict
        for eachprop in proplist:
            onepropmap[eachprop]=idhash
            manypropmap[eachprop]=idhash_many
            oneproprule[eachprop] = oneruledict
            manyproprule[eachprop] = manyruledict
    filedict = {"one_map.txt":onemapdict,"many_map.txt":manymapdict,"one_rule_map.txt":onerulemap,
                "many_rule_map.txt":manyrulemap,"one_prop_map.txt":onepropmap,"many_prop_map.txt":manypropmap,
                "one_prop_rule.txt":oneproprule,"many_prop_rule.txt":manyproprule}
    for eachkey in list(filedict.keys()):
        with open(os.path.join(datapath,eachkey),'w+') as outfile:
            outfile.write(json.dumps(filedict[eachkey]))
    

In [None]:
datapath = os.path.join('results','mappings')
generate_prop_rule_maps(datapath,expected_dict,prop_freq)