In [1]:
import os 
import json 
import zipfile
import rocrate.utils as utils
import rocrate.rocrate as rocrate
from datetime import datetime
import pytest

# Checking RO-Crate Metadata Root Data Entity

### RO-Crate Self Descriptor

In [26]:
def entity_type(type, exp_type):
    return True if type == exp_type else False

def entity_id(id, *exp_id):
    return True if id in exp_id else False

def entity_about(about, exp_about):
    return True if about == exp_about else False

def entity_conformsTo(cfmsTo):
    return True if cfmsTo.startswith("https://w3id.org/ro/crate/") else False

def entity_property(entity, type):
    if entity_type(type, ['CreativeWork']):
        try:
            id = utils.get_norm_value(entity, "@id")[0]
            about = utils.get_norm_value(entity, "about")[0]
            cfm = utils.get_norm_value(entity, "conformsTo")[0]
        except IndexError:
            return False
        if entity_id(id, 'ro-crate-metadata.json', 'ro-crate-metadata.jsonld') and entity_about(about, './') and entity_conformsTo(cfm):
            return True
    return False
            
    
def self_descriptor_check(tar_file, extension):
    """\
    Check the self descriptor in RO-Crate
    Please check the requirements details in: 
    <https://www.researchobject.org/ro-crate/1.1/root-data-entity.html>
    """
    NAME = "Self descriptor check"
    error_message = "entity property of self descriptor is incorrect"
    
    with open (os.path.join(tar_file, "ro-crate-metadata.json"), 'r') as file:
        metadata = json.load(file)
        graph = metadata['@graph']
    
    for entity in graph:
        type = utils.get_norm_value(entity, "@type")
        if entity_property(entity, type):
            return NAME, [], True 

    return NAME, error_message, False

### Direct property of Root Data Entity

In [6]:
def datetime_valid(dt_str):
    try: 
        datetime.fromisoformat(dt_str)
    except: 
        return False
    return True

def dataset_property(entity, type):
    try:
        id = utils.get_norm_value(entity, '@id')[0]
    except IndexError:
        return False
    if type[0] == ['Dataset'] and id.endswith('/'):
        return True
    return False
            
def direct_property_check(tar_file, extension):
    """\
    A valid RO-Crate MUST meets the direct property requirements
    Please check the requirements details in: 
    <https://www.researchobject.org/ro-crate/1.1/root-data-entity.html>
    """
    NAME = "Direct property check"
    error_message = ["datePublished is not in ISO 8601 date format", "Directory property of RO-Crate is wrong"]
    
    with open (os.path.join(tar_file, "ro-crate-metadata.json"), 'r') as file:
        metadata = json.load(file)
        graph = metadata['@graph']
    
    ### check each entity in @graph of metadata, each type must be Dataset and datePublished has to be in ISO format
    for entity in graph: 
        type = utils.get_norm_value(entity, '@type')
        if dataset_property(entity, type):
            for _ in utils.get_norm_value(entity, 'datePublished'):
                if datetime_valid(_) == True:
                    return NAME, [], datetime_valid(_)
                else:
                    return NAME, error_message[0], datetime_valid(_)
                
    return NAME, error_message[1], False
                

# Checking RO-Crate Data Entity

### Referencing file or folder from root data entity

In [56]:
### the value of type must either be string of expected type or a list with expected type in elements. 
def metadata_contains(metadata, id_, exp_ct):
    if metadata["%s" % id_[0]]["@type"] == exp_ct or (isinstance(metadata["%s" % id_[0]]["@type"], list) and exp_ct in metadata["%s" % id_[0]]["@type"]):
        return True
    return False

### referencing result record the metadata name as key and check result as value. 
def update_rfeResult(id_, referencing_result, metadata, error_message, exp_ct):
    if metadata_contains(metadata, id_, exp_ct):
        referencing_result[id_[0]] = True
    else:
        referencing_result[id_[0]] = [False, error_message.format(id_[0])]
                       
    
def referencing_check(tar_file, extension):
    """/
    Where file or folder are represented as Data Entity in RO-Crate JSON-LD
    There MUST be linked to, directly or indirectly, hasPart in Root Data Entity.
    For more information, please check : 
    <https://www.researchobject.org/ro-crate/1.1/data-entities.html#referencing-files-and-folders-from-the-root-data-entity>
    """
    
    NAME = "Referencing check"
    error_message = "The referencing {} is wrong"
    
    ### Create a dictionary to store the referencing check result
    referencing_result = {}
    
    context, metadata = rocrate.read_metadata(os.path.join(tar_file, "ro-crate-metadata.json"))
    
    for entity in metadata.values(): 
        hasPart = utils.get_norm_value(entity, "hasPart")
        creator = utils.get_norm_value(entity, "creator")
        if len(hasPart) != 0: 
            break
    
    for parts in hasPart: 
        id_ = utils.get_norm_value(metadata[parts], "@id")
        extensions = os.path.splitext(id_[0])[1]
        
        if extension == "" and id_[0].endswith('/'):
            update_rfeResult(id_, referencing_result, metadata, error_message, "Dataset")
        elif extension != "":
            update_rfeResult(id_, referencing_result, metadata, error_message, "File")
    
    ### loop through referencign result, if there is a list in the vlaue of dictionary, the function will return False
    for values in referencing_result.values():
        if isinstance(values, list):
            return NAME, values[1], values[0]
    
    return NAME, [], True


### Detailed Descriptions of Encodings

In [None]:
### for the value of encoding is url, the type must have website in the element within the list
def update_ecdResult(type, encoding, encoding_result, error_message):
    type = "WebSite" if "Website" in type else None
    if utils.is_url(encoding[1]) and type != None:
        encoding_result[encoding[1]] = True
    else:
        encoding_result[encoding[1]] = [False, error_message[1].format(encoding[1])]
        
def ext_based_updEcd(extension, encoding, encoding_result, type, error_message):
    if extension == "" and encoding[1].endswith("/") and "Dataset" in type:
        encoding_result[encoding[1]] = True
    elif extension != "" and "File" in type:
        encoding_result[encoding[1]] = True
    else:
        encoding_result[encoding[1]] = [False, error_message[1].format(encoding[1])]

def encoding_check(tar_file, extension): 
    
    """
    The details of encoding should meet the requirments
    Please check more information at:
    <https://www.researchobject.org/ro-crate/1.1/data-entities.html#adding-detailed-descriptions-of-encodings>
    """
    
    NAME = "Encoding check"
    error_message = ["Encoding in {} is wrong", "The value of @type in {} is incorrect"]
    
    ### Create a dictionary to store the encoding check result
    encoding_result = {}
    
    context, metadata = rocrate.read_metadata(os.path.join(tar_file, "ro-crate-metadata.json"))
    
    for entity in metadata.values(): 
        encoding = utils.get_norm_value(entity, "encodingFormat")
        if len(encoding) >= 2:
            type = utils.get_norm_value(metadata[encoding[1]], "@type")
            if utils.is_url(encoding[1]):
                update_ecdResult(type, encoding, encoding_result, error_message)
            else:
                extension = os.path.splitext(encoding[1])[1]
                ext_based_updEcd(extension, encoding, encoding_result, type, error_message)
                    
    ### If any of the value in the dictionary are false which should be a list, then return false
    for values in encoding_result.values():
        if isinstance(values, list):
            return NAME, error_message[0].format(list(encoding_result.keys())[list(encoding_result.values()).index([values[0], values[1]])]), values[0]
    
    return NAME, [], True
                
        

### Web-Based Data Entities

In [None]:
def is_downloadable(url):
    """
    Does the url contain a downloadable resourses
    """
    r = requests.get(url,stream=True)
    content_type = r.headers.get('content-type')
    if "text" in content_type.lower(): 
        return False
    if 'html' in content_type.lower(): 
        return False
    return True

def urlFile_updRlt(id_, entity, webbased_result, error_message):
    if is_downloadable(id_):
        try:
            sdDatePublished = utils.get_norm_value(entity, "sdDatePublished")[0]
            if datetime_valid(sdDatePublished):
                webbased_result[id_] = True
            else:
                webbased_result[id_] = [False, error_message[1].format(id_)]
        except IndexError:
            webbased_result[id_] = [False, error_message[1].format(id_)]
    else:
        webbased_result[id_] = [False, error_message[0].format(id_)]

def dirOnWeb_updRlt(entity, metadata, webbased_result):
    distribution = utils.get_norm_value(entity, "distribution")
    if distribution != []:
        dis_type = utils.get_norm_value(metadata[distribution[0]], "@type")
        if dis_type[0] !="DataDownload":
            webbased_result[distribution[0]] = [False, "the value of @type of {} has to be DataDownload".format(distribution[0])]
        else:
            webbased_result[distribution[0]] = True


              
def webbased_entity_check(tar_file, extension):
    """
    Please check RO-Crate website for more information about web-based data entity.
    <https://www.researchobject.org/ro-crate/1.1/data-entities.html#web-based-data-entities>
    """
    
    NAME = "Web-based data entity check"
    error_message = ["web-based data entity {} should be a downloadable url", "sdDatePublished of web based data entity {} is not provided or incorrect"]
    
    webbased_result = {}
    
    context, metadata = rocrate.read_metadata(os.path.join(tar_file, "ro-crate-metadata.json"))
    
    for entity in metadata.values():
        type = utils.get_norm_value(entity, "@type")[0]
        id_ = utils.get_norm_value(entity, "@id")[0]
        
        ### update result
        if type == "File" and utils.is_url(id_):
            urlFile_updRlt(id_, entity, webbased_result, error_message)
        elif type == "Dataset":
            dirOnWeb_updRlt(entity, metadata, webbased_result)
    
    for values in webbased_result.values():
        if isinstance(values, list):
            return NAME, values[1], False
    
    return NAME, [], True
            
        

# Checking Contextual Entities

### Checking People

In [None]:
def check_author_type(author, metadata, person_result, error_message):
    if author != []:
        author = author[0]
        if utils.is_url(author):
            type = utils.get_norm_value(metadata[author], "@type")
            try:
                if type[0] == "Person":
                    person_result[author] = True
                else:
                    person_result[author] = [False, error_message.format(author)]
            except IndexError:
                person_result[author] = [False, error_message.format(author)]

def person_entity_check(tar_file, extension):
    
    """
    <https://www.researchobject.org/ro-crate/1.1/contextual-entities.html#people>
    """
    
    NAME = "Person entity check"
    error_message = "Person entity {} is incorrect"
    
    person_result = {}
    
    context, metadata = rocrate.read_metadata(os.path.join(tar_file, "ro-crate-metadata.json"))
    for entity in metadata.values():
        author = utils.get_norm_value(entity, "author")
        check_author_type(author, metadata, person_result, error_message)
    
    for values in person_result.values():
        if isinstance(values, list):
            return NAME, values[1], False
    
    return NAME, [], True
        

### Checking Organizations

In [8]:
def publisher_affiliation_correctness(entity, ck_item, metadata, organization_result, error_message):
    item = utils.get_norm_value(entity, "%s" %ck_item)
    if item != []:
        item = item[0]
        if utils.get_norm_value(metadata[item], "@type") == ["Organization"]:
            organization_result[utils.get_norm_value(entity, "@id")[0]] = True
        else:
            organization_result[utils.get_norm_value(entity, "@id")[0]] = [False, error_message.format(item)]
        
def organization_check(tar_file, extension):
    """
    An Organization SHOULD be the value for the publisher property of a Dataset or ScholarlyArticle 
    or affiliation property of a Person.
    Please see more information and examples at RO-Crate Website
    <https://www.researchobject.org/ro-crate/1.1/contextual-entities.html#organizations-as-values>
    """
    NAME = "Organization check"
    error_message = "Organization contextual entity {} is incorrect"
    organization_result = {}
    
    context, metadata = rocrate.read_metadata(os.path.join(tar_file, "ro-crate-metadata.json"))
    for entity in metadata.values():
        type = utils.get_norm_value(entity, "@type")[0]
        
        ### check the value of publisher for each dataset and scholarly article entity
        if type =="Dataset" or type == "ScholarlyArticle":
            publisher_affiliation_correctness(entity, "publisher", metadata, organization_result, error_message)
            
        ### check the vlaue of affiliation for each file entity
        elif type == "File":
            publisher_affiliation_correctness(entity, "affiliation", metadata, organization_result, error_message)
    
    for values in organization_result.values():
        if isinstance(values, list):
            return NAME, values[1], False
      
    return NAME, [], True
        


# Checking Scripts and Workflows

In [None]:
def recognisedWkf_upd(extension_set, entity, workflow_result, id_, error_message):
    extension = os.path.splitext(id_)[1]
    if extension in extension_set:
        type = utils.get_norm_value(entity, "@type")
        name = utils.get_norm_value(entity, "name")
        if "File" in type and "SoftwareSourceCode" in type and "ComputationalWorkflow" in type and name != []:
            workflow_result[id_] = True
        else:
            workflow_result[id_] = [False, error_message[1].format(id_)]

def unrecognisedWfk_upd(type, extension_set, entity, workflow_result, warning_message):
    extension = os.path.splitext(utils.get_norm_value(entity,"@id")[0])[1]
    if "File" in type and "SoftwareSourceCode" in type:
        if extension in extension_set:
            workflow_result[utils.get_norm_value(entity, "@id")[0]] = True
        else:
            workflow_result[utils.get_norm_value(entity, "@id")[0]] = warning_message.format(extension)
            
def scripts_and_workflow_check(tar_file, extension):
    
    """
    For workflow RO-Crate, if there is an unrecognised workflow file, the function will return an warning message.
    Please check more details at RO-Crate website:
    <https://www.researchobject.org/ro-crate/1.1/workflow-and-scripts.html>
    """

    NAME = "Scripts and workflow check"
    error_message = ["scripts and workflow is wrong", " ComputationalWorkflow has to be one of value in @type for workflow file {}"]
    warning_message = "Warning: {} is not a recognised workflow extension. Please raise an issue at GitHub: <https://github.com/ResearchObject/ro-crate-validator-py/issues>."
    wkfext_path = '/Users/xuanqili/Desktop/ro-crate-validator-py/src/workflow_extension.txt'

    workflow_result = {}
    
    context, metadata = rocrate.read_metadata(os.path.join(tar_file, "ro-crate-metadata.json"))
    
    ### check if recognised workflow file meets the requirments
    for entity in metadata.values():
        id_ = utils.get_norm_value(entity, "@id")[0]
        with open (wkfext_path, "r") as file:
            extension_set = file.read().splitlines()
        recognisedWkf_upd(extension_set, entity, workflow_result, id_, error_message)
    
    ### check unrecognised workflow file with ComputaionalWorkflow in its @type
    for entity in metadata.values():
        type = utils.get_norm_value(entity, "@type")
        if "ComputationalWorkflow" in type:
            unrecognisedWfk_upd(type, extension_set, entity, workflow_result, warning_message)

    ### fucntion will return True only when the all of the recognised workflow file are correct      
    counter = 0
    for values in workflow_result.values():
        counter += 1
        if isinstance(values, list):
            return NAME, values[1], False
        if counter == len(workflow_result.values()):
            return NAME, [], True
    
    return NAME, error_message, False