In [1]:
THESIS_OUTPUT_NS = 'v1.studentthesis-sync.pure.atira.dk'
COMMONS_NS = 'v3.commons.pure.atira.dk'
RO_NS = "{%s}" % THESIS_OUTPUT_NS
C_NS = "{%s}" % COMMONS_NS
NSMAP = {"sto": THESIS_OUTPUT_NS, "comm": COMMONS_NS}
DEFAULT_LANGUAGE = 'en_GB'
DEFAULT_MANAGING_ORG_UNIT = '1'
#LICENCE_MAP  = {'cc_by_nc_nd_4': 'cc_by_nc_nd_4', 
#                'cc_by_4': 'cc_by',
#                'arr': 'arr', 
#                'cc_by_nc_nd': 'cc_by_nc_nd',
#                'cc_by_nd': 'cc_by_nd',
#                'cc_by_nd_4': 'cc_by_nd' }
VIS_MAP = {'public': 'Public' ,'staffonly' : 'Restricted'}
LANG_MAP = {'en': 'en_GB', 'fr': 'fr_FR', 
            'de': 'de_DE', 'ru': 'ru_RU',
            'zh': 'zh_CN', 
            'ja': 'ja_JP', 'it': 'it_IT',
            'nl': 'nl_NL', 'pt': 'pt_PT',
            'pl': 'pl_PL', 'cs': 'cs_CZ',
            'ko': 'ko_KR', 'id': 'id_ID',
            'vi': 'vi_VN', 'es': 'es_ES',
            'el': 'el_GR', 'fa': 'fa_IR',
            'fi': 'fi_FI', 'aus': 'en_GB',
            'bi': 'bi_VU', 'sk': 'sk_SK',
            'is': 'is_IS', 'sv': 'sv_SE',
            'other': 'und'}

XML_HEADER = '<?xml version="1.0" encoding="UTF-8"?>\n' +\
             '<sto:studentTheses xmlns:sto="v1.studentthesis-sync.pure.atira.dk" xmlns:comm="v3.commons.pure.atira.dk">'
XML_FOOTER = '</sto:studentTheses>'
PRETTY_PRINT = True

In [2]:
import logging
from lxml import etree as et
import urllib

In [3]:
EP_NSMAP = {'ep': 'http://eprints.org/ep2/data/2.0'}

In [4]:
def map(id, content_type, xml_in):
    tree = et.fromstring(xml_in)
    logging.info('Mapping publication {} with type {}'.format(id, content_type))
    # mapping_function = MAPPING_FUNCTIONS.get(content_type)
    # if not mapping_function:
    #     raise Exception('No mapping function available for content type {}'.format(content_type))
    result = map_thesis(id, content_type, tree)
    return result

In [5]:
def escape_entities(text):
    escaped_text = text.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')\
    .replace('"', '&quot;').replace('\'', '&apos;')
    return escaped_text

def fit_string(source, length):
    if len(source) > length:
        logging.warning('String value {} longer than max size: {}. Shortening it'.format(source, length))
        return source[:length - 3] + '...'
    else:
        return source

def marshal_xml(tree):
    xml_out = et.tostring(tree, pretty_print=PRETTY_PRINT, encoding="UTF-8").decode("UTF-8")
    return xml_out.replace('xmlns:sto="{}" '.format(THESIS_OUTPUT_NS), '').replace('xmlns:comm="{}" '.format(COMMONS_NS), '')

In [6]:
def set_persons(pub, meta):
    owner = None
    logging.debug('{}\tSetting persons'.format(id))
    # Get source data
    source_authors = [{'family': c.find('ep:name/ep:family', EP_NSMAP).text, 
                       'given': c.find('ep:name/ep:given', EP_NSMAP).text if c.find('ep:name/ep:given', EP_NSMAP) is not None else '', 
                       'id': c.find('ep:id', EP_NSMAP).text if c.find('ep:id', EP_NSMAP) is not None else None } 
                      for c in meta.findall('ep:creators/ep:item', EP_NSMAP)]
    # Persons
    authors = et.SubElement(pub, RO_NS+'authors', nsmap=NSMAP)
    for p in source_authors:
        author = et.SubElement(authors, RO_NS+'author', nsmap=NSMAP)
        author.set('id', id)
        # TODO: change role
        et.SubElement(author, RO_NS+'role', nsmap=NSMAP).text = 'author'
        person = et.SubElement(author, RO_NS+'person', nsmap=NSMAP)
        et.SubElement(person, RO_NS+'firstName', nsmap=NSMAP).text = p['given']
        et.SubElement(person, RO_NS+'lastName', nsmap=NSMAP).text = p['family']
        # If internal, add the ID
        internal_person = match_person(p)
        if internal_person is not None:
            logging.info('Internal person matched: {}'.format(internal_person[0]))
        #    author.set('id', internal_person[0])
        #    person.set('origin', 'internal')
        #else:
        #    person.set('origin', 'external')
        # Person organization
        # if author is internal, get affiliation from Pure
        #if person.get('origin') == 'internal' and internal_person[1] is not None:
        #    orgs = et.SubElement(author, RO_NS+'organisations', nsmap=NSMAP)
        #    org = et.SubElement(orgs, RO_NS+'organisation', nsmap=NSMAP)
        #    org.set('id', internal_person[1])
            
            # If this is the first internal organization encountered, set it as the owner
            if not owner:
                owner = internal_person[1]
        
    return owner


In [7]:
import re
def map_thesis(id, type, meta):
    logging.info(f'Mapping {id}`to thesis')
    logging.debug('{} - Setting basic metadata'.format(id))
    
    ## ATTRIBUTES
    # ID
    stu_thesis = et.Element(RO_NS+'studentThesis', nsmap=NSMAP)
    stu_thesis.set('id', id)
    # Type
    
    stu_thesis.set('type','doc')        
    # Title
    title = meta.find('ep:title', EP_NSMAP)
    if title is not None:
        et.SubElement(stu_thesis, RO_NS+'title', nsmap=NSMAP).text = title.text
    
    # Language: select the most frequent language of the documents associated with this record
    langs = meta.findall('ep:student/ep:language', EP_NSMAP)
    languages = [l.text for l in langs]
    lang_freq = {languages.count(l): l for l in languages}
    if len(languages) > 0:
        selected_lang = lang_freq[max(lang_freq.keys())]
        logging.info("Language: {}".format(selected_lang))
        lang_code = LANG_MAP.get(selected_lang)
        if lang_code is None:
            raise Exception('Unknown language: {}'.format(selected_lang))
    else:
        lang_code = DEFAULT_LANGUAGE
    et.SubElement(stu_thesis, RO_NS+'language', nsmap=NSMAP).text = lang_code

    # Status date
    date_value = meta.find('ep:date', EP_NSMAP)
    date_value = date_value.text.split('-')
    date = et.SubElement(stu_thesis, RO_NS+'awardDate', nsmap=NSMAP)
    et.SubElement(date, C_NS+'year', nsmap=NSMAP).text = date_value[0]
    if len(date_value) > 1:
        et.SubElement(date, C_NS+'month', nsmap=NSMAP).text = date_value[1]
    if len(date_value) == 3:
        et.SubElement(date, C_NS+'day', nsmap=NSMAP).text = date_value[2]
    
    # Abstract
    abstract_in = meta.find('ep:abstract', EP_NSMAP)
    if abstract_in is not None:
        logging.debug('{}\tSetting abstract'.format(id))
        abstract = et.SubElement(stu_thesis, RO_NS+'abstract', nsmap=NSMAP)
        abstract_text = et.SubElement(abstract, C_NS+'text', nsmap=NSMAP)
        # # Using publication language as the abstract language
        # abstract_text.set('lang', lang_code.split('_')[0])
        # # Deals with language 'unknown' that lacks country code
        # if len(lang_code.split('_')) == 2:
            # abstract_text.set('country', lang_code.split('_')[1])
        abstract_text.text = et.CDATA(abstract_in.text)
 
    # Persons: returns the first internal org_unit id encountered, so it can be set as the owner
    owner = set_persons(stu_thesis, meta)

    #sponsors
    sponsors = meta.findall('ep:funders/ep:item', EP_NSMAP)
    if sponsors is not None and len(sponsors)>0:
        sponsors_in = et.SubElement(stu_thesis, RO_NS+'sponsors', nsmap=NSMAP)
        for s in sponsors:
            sponsor = et.SubElement(sponsors_in, RO_NS+'sponsor', nsmap=NSMAP)
            et.SubElement(sponsor, C_NS+'externalOrgId', nsmap=NSMAP).text = s.text
    
    # Owner: this is the first internal org unit found in the publication (see set_persons())
    # If owner is None, replace it with the default managing org unit
    if not owner:
        owner = DEFAULT_MANAGING_ORG_UNIT
    logging.info('Setting owner: {}'.format(owner))
    et.SubElement(stu_thesis, RO_NS+'managingOrganisation', nsmap=NSMAP).set('lookupId', owner)
        
    # Keywords
    keys = meta.find('ep:keywords', EP_NSMAP)
    if keys is not None:
        keys_value = re.split(r"[;,]\s*", keys.text)
        keywords = et.SubElement(stu_thesis, RO_NS+'keywords', nsmap=NSMAP)

        for k in keys_value:
            et.SubElement(keywords, C_NS+'keyword', nsmap=NSMAP).text = k
    # Free text keywords
    # if keys is not None:
    
    # Electronic versions
    documents = meta.findall('ep:documents/ep:document', EP_NSMAP)
    if documents is not None and len(documents) > 0:
        ev = et.SubElement(stu_thesis, RO_NS+'documents', nsmap=NSMAP)
        logging.debug('Setting documents')
        for document_tag in documents:
            # Skip all files having type 'other'
            if document_tag.find('./ep:format', EP_NSMAP).text == 'other':
                continue
            extracted_data = {}
            #evfile = et.SubElement(ev, RO_NS+'electronicVersionFile', nsmap=NSMAP)     
            
            for item in document_tag.iter():
                tag = item.tag.split("}")[1]
                extracted_data[tag] = item.text
                
                if tag == 'document':
                    extracted_data['document id'] = item.attrib['id']      
            file_id = extracted_data.get('document id', 'document1')
            file = et.SubElement(ev, RO_NS+'studentThesisDocuments', nsmap=NSMAP, attrib={'id': file_id})
            
            et.SubElement(file, RO_NS+'type', nsmap=NSMAP).text = 'thesis'
            
            if 'url' in extracted_data.keys():
                filelocation = et.SubElement(file, RO_NS+'fileLocation', nsmap=NSMAP)
                filelocation.text = extracted_data['url']
                
            if 'mime_type' in extracted_data.keys():
                mimetype = et.SubElement(file, RO_NS+'mimeType', nsmap=NSMAP)
                mimetype.text = extracted_data['mime_type']

            if 'filename' in extracted_data.keys():
                filename = et.SubElement(file, RO_NS+'fileName', nsmap=NSMAP)
                filename.text = extracted_data['filename']    
            
            if 'doc_title' in extracted_data.keys():    
                title = et.SubElement(file, RO_NS+'title', nsmap=NSMAP)
                title.text = extracted_data['doc_title'] 
           
            if 'date_embargo' in extracted_data.keys():      
                date = et.SubElement(file, RO_NS+'embargoDate', nsmap=NSMAP)
                date.text = extracted_data['date_embargo'] 
                print(date.text, date_embargo)
                            
            if 'license' in extracted_data.keys():                
                licence = et.SubElement(file, RO_NS+'documentLicense', nsmap=NSMAP)
                licence.text = extracted_data['license']
                
            if 'security' in extracted_data.keys():    
                vis = et.SubElement(file, RO_NS+'visibility', nsmap=NSMAP)
                vis.text = VIS_MAP.get(extracted_data['security'], 'Public')
    
    #note
    #note = meta.find('ep:note', EP_NSMAP)
    #if note is not None:
    #    et.SubElement(stu_thesis, RO_NS+'bibliographicalNotes', nsmap=NSMAP).text = note.text
    
    return marshal_xml(stu_thesis)



In [8]:
def match_person(p):
    result = None
    logging.debug(p)
    sql_id_match = "select id, org_id from v_persons_matching where identifiers like '%{}%'  or lower(email) = '{}'"
    sql_name_match = 'select id, org_id from v_persons_matching where lower(family_name) = lower(%s) and lower(given_name) = lower(%s)'
    cur1 = conn.cursor()
    if p.get('id') is not None:
        logging.info('Attempting match by id with {}'.format(p['id']))
        cur1.execute(sql_id_match.format(p['id'], p['id']))
        res = cur1.fetchall()
        if len(res) == 0:
            logging.warning('Author id {} not matched. Attempting match by name'.format(p['id']))
            cur1.execute(sql_name_match, (p['family'], p['given']))
            res = cur1.fetchall()
    else:
        logging.info('Attempting match by name with {} {}'.format(p['family'], p['given']))
        cur1.execute(sql_name_match, (p['family'], p['given']))
        res = cur1.fetchall()
    if len(res) == 0:
        logging.warning('Unable to match author {} {}. It will be treated as external'.format(p['given'], p['family']))
    elif len(res) > 1:
        logging.error('Multiple matches for author {} {}.'.format(p['given'], p['family']))
    else:
        result = res[0]
    return result
    cur.close()