##  MAIN CONTROLLER

In [176]:
from config import sparqlTerms, mig_ns, sparql_mig_test, sparql_mig_simple, sparql_mig_dev, vocabs, types
from SPARQLWrapper import JSON, SPARQLWrapper
from utilities import removeNS, PrintException, cleanOutputs
import re, os, concurrent.futures, json

In [177]:
def main():    
    #  Iterate over every type of object that needs to be migrated. 
    #  This is the first splitting of the data for migration.
    cleanOutputs(types)
    for ptype in types:
        # a queryObject knows where it came from.
        # a queryObject has been split into multiple groups
        # only one group exists for community, and one for collection objects
        # approximately a thousand queries each are minted for thesis and for generic objects
        # these queries are based on the first folder in the fedora pair tree
   
        queryObject = QueryFactory.getMigrationQuery(ptype, sparqlData=sparql_mig_simple)
        print('%s batch queries generated' % (ptype))
        print('%i batch(es) of %s objects to be transformed' % (len(queryObject.queries), ptype))
        i = 0
        for group in queryObject.queries.keys():
            i = i + 1
            DTO = DataFactory.getData(queryObject.queries[group], group, queryObject) # query, group, object
            DTO.transformData()
            print("%i of %i %s batches transformed" % (i, len(queryObject.queries), ptype) )
        print("%s objects transformation completed" % (ptype) )

##  TRANSFORMATIONS
#### functions for handling data passed over by the data object. Takes a triple, detects what kind of action needs to be taken based on the predicate, sends it to the appropriate function for transformations, then returns it back to the data handler to be saved.

In [178]:
class Transformation():
    
    """
    the output must be a list of triples matching the same format as the input (as follows):
    
    {
        'subject': {
            'value': 'http://gillingham.library.ualberta.ca:8080/fedora/rest/prod/0r/96/76/28/0r967628d', 
            'type': 'uri'
        }, 
        'predicate': {
            'value': 'http://purl.org/dc/elements/1.1/subject', 
            'type': 'uri'
        }, 
        'object': {
            'value': 'Geochemistry', 
            'type': 'literal'
        }
    }
    output is appended to self.output
    """
    
    def __init__(self):
        self.output = []
        
    ############################################################################
    ######################## transformation on rdf:type ########################
    ############################################################################
        
    def rdfsyntaxnstype(self, triple, ptype):
        self.output.append(triple)
        return self.output

       
    ############################################################################
    ######################## transformation on dcterms:language ################
    ############################################################################

    def language(self, triple, ptype):
        # normalize values and convert to URI (consult the "vocabs" variable from the config file (this folder))
        self.output.append(triple)
        return self.output


    ############################################################################
    ######################## transformation on dc:rights #######################
    ############################################################################
    
    
    def rights(self, triple, ptype):
        #### 
        # several different license values need to be coerced into one common value, this needs to be confirmed with leah before it is written
        self.output.append(triple)
        return self.output

    
    ############################################################################
    ######################## transformation on ual:institution #################
    ############################################################################

    def institution(self, triple, ptype):
        # convert university of alberta to <http://id.loc.gov/authorities/names/n79058482>
        self.output.append(triple)
        return self.output
 

    ############################################################################
    ######################## transformation on dcterms:license #################
    ############################################################################

    
    def license(self, triple, ptype):
        #### 
        # convert licenses from text to URI (use vocabs variable, some coersion will be necessary)
        self.output.append(triple)
        return self.output
    
    
    ############################################################################
    ######################## transformation on dcterms:type ####################
    ############################################################################
    
    def type(self, triple, ptype):      
        if ptype == 'batch':
                # null
                # Complete
                # processing
            self.output.append(
                 {
                    'subject': {
                        'value': triple['subject']['value'], # the subject of the triple
                        'type': 'uri'
                    }, 
                    'predicate': {
                        'value': "a different predicate", # the predicate of the triple
                        'type': 'uri'
                    }, 
                    'object': {
                        'value': triple['object']['value'], # mapped uri
                        'type': 'uri'
                    }
                }
            )
        
        if ptype == 'thesis':
            # nothing needs to happen
            pass
        elif ptype == 'generic':
            for vocab in vocabs["type"]:
                # mint a new triple with the mapped type
                if triple['object']['value'] in vocab["mapping"]:
                    self.output.append(
                        {
                            'subject': {
                                'value': triple['subject']['value'], # the subject of the triple
                                'type': 'uri'
                            }, 
                            'predicate': {
                                'value': triple['predicate']['value'], # the predicate of the triple
                                'type': 'uri'
                            }, 
                            'object': {
                                'value': vocab["uri"], # mapped uri
                                'type': 'uri'
                            }
                        }
                    )
                if "Draft-Submitted" in triple['object']['value']:
                    self.output.append( 
                        {
                            'subject': {
                                'value': triple['subject']['value'], # the subject of the triple
                                'type': 'uri'
                            }, 
                            'predicate': {
                                'value': "http://purl.org/ontology/bibo/status", # the predicate of the triple
                                'type': 'uri'
                            }, 
                            'object': {
                                'value': "http://vivoweb.org/ontology/core#submitted", # mapped uri
                                'type': 'uri'
                            }
                        }
                    )
                    self.output.append(
                         {
                            'subject': {
                                'value': triple['subject']['value'], # the subject of the triple
                                'type': 'uri'
                            }, 
                            'predicate': {
                                'value': "http://purl.org/ontology/bibo/status", # the predicate of the triple
                                'type': 'uri'
                            }, 
                            'object': {
                                'value': "http://purl.org/ontology/bibo/status#draft", # mapped uri
                                'type': 'uri'
                            }
                        }
                    )
                if "Published" in triple['object']['value']:
                    self.output.append( 
                         {
                            'subject': {
                                'value': triple['subject']['value'], # the subject of the triple
                                'type': 'uri'
                            }, 
                            'predicate': {
                                'value': "http://purl.org/ontology/bibo/status", # the predicate of the triple
                                'type': 'uri'
                            }, 
                            'object': {
                                'value': "http://purl.org/ontology/bibo/status#published", # mapped uri
                                'type': 'uri'
                            }
                        }
                    )
        elif (ptype == 'community') or (ptype == 'collection'):
            self.output.append(triple)
        
        return self.output

##  QUERY BUILDER
##### Pulls current mappings from triplestore, dynamically builds queries in managable sizes

In [179]:
class Query(object):
    """ Query objects are dynamically generated, and contain SPARQL CONSTRUCT queries with input from the jupiter application profile """
    def __init__(self, ptype, sparqlData, sparqlTerms=sparqlTerms):
        self.mapping = []
        self.sparqlTerms = SPARQLWrapper(sparqlTerms)  # doesn't need to change (the terms store doesn't change)
        self.sparqlData = SPARQLWrapper(sparqlData)  # sets the triple store from which to get data (simple, test, or dev)
        self.endpoint = sparqlData
        self.queries = {}
        self.splitBy = {}
        self.prefixes = ""
        self.filename = ""
        for ns in mig_ns:
            self.prefixes = self.prefixes + " PREFIX %s: <%s> " % (ns['prefix'], ns['uri'])
        self.getMappings()
        self.generateQueries()

    def getMappings(self):
        query = "prefix ual: <http://terms.library.ualberta.ca/>SELECT * WHERE {GRAPH ual:%s {?newProperty ual:backwardCompatibleWith ?oldProperty} }" % (self.ptype)
        self.sparqlTerms.setReturnFormat(JSON)
        self.sparqlTerms.setQuery(query)
        results = self.sparqlTerms.query().convert()
        for result in results['results']['bindings']:
            self.mapping.append((result['newProperty']['value'], result['oldProperty']['value']))

    def getSplitBy(self):
        # base query only needs 3 prefixes appended to the "select" statement defined by the object
        query = "prefix dcterm: <http://purl.org/dc/terms/> prefix info: <info:fedora/fedora-system:def/model#> prefix ual: <http://terms.library.ualberta.ca/> %s" % (self.select)
        self.sparqlData.setReturnFormat(JSON)
        self.sparqlData.setQuery(query)
        results =  self.sparqlData.query().convert()
        # iterate over query results
        for result in results['results']['bindings']:
            # the group is the two folders at the base of the pair tree, concatenated by an underscore
            group = result['resource']['value'].split('/')[6]
            # assign that parameter by which you want to search to that group
            self.splitBy[group] = "/".join( result['resource']['value'].split('/')[:7] )# the stem of the resource [0] and the group number by which to save [1] (this is the first digit in the pair tree)
            

    def generateQueries(self):
        pass
    
    def writeQueries(self):
        filename = "cache/%s.json" % (self.ptype)
        with open(filename, 'w+') as f:
            json.dump([self.queries], f)
               
class Collection(Query):
    def __init__(self, sparqlData):
        self.ptype = 'collection'
        self.construct = "CONSTRUCT { ?resource info:hasModel 'IRItem'^^xsd:string ; rdf:type pcdm:Collection"
        self.where = ["WHERE { ?resource info:hasModel 'Collection'^^xsd:string . OPTIONAL { ?resource ualids:is_community 'false'^^xsd:boolean } . OPTIONAL { ?resource ualid:is_community 'false'^^xsd:boolean } . OPTIONAL { ?resource ual:is_community 'false'^^xsd:boolean }"]
        self.select = None
        super().__init__(self.ptype, sparqlData)

    def generateQueries(self):
        for where in self.where:
            construct = self.construct
            for pair in self.mapping:
                construct = "%s ; <%s> ?%s" % (construct, pair[0], removeNS(pair[0]))
                where = " %s . OPTIONAL { ?resource <%s> ?%s . FILTER (?%s!='') }" % (where, pair[1], removeNS(pair[0]), removeNS(pair[0]))
            self.queries['collection'] = "%s %s } %s }" % (self.prefixes, construct, where)
        self.writeQueries

class Community(Query):
    def __init__(self, sparqlData):
        self.ptype = 'community'
        self.construct = "CONSTRUCT { ?resource info:hasModel 'IRItem'^^xsd:string ; rdf:type pcdm:Object; rdf:type ual:Community"
        self.where = ["WHERE { ?resource info:hasModel 'Collection'^^xsd:string ; OPTIONAL { ?resource ualids:is_community 'true'^^xsd:boolean } . OPTIONAL { ?resource ualid:is_community 'true'^^xsd:boolean } . OPTIONAL { ?resource ual:is_community 'true'^^xsd:boolean }"]
        self.select = None
        super().__init__(self.ptype, sparqlData)

    def generateQueries(self):
        for where in self.where:
            construct = self.construct
            for pair in self.mapping:
                construct = "%s ; <%s> ?%s" % (construct, pair[0], removeNS(pair[0]))
                where = " %s . OPTIONAL { ?resource <%s> ?%s . FILTER (?%s!='') }" % (where, pair[1], removeNS(pair[0]), removeNS(pair[0]))
            self.queries['community'] = "%s %s } %s }" % (self.prefixes, construct, where)
        self.writeQueries()

class Generic(Query):
    def __init__(self, sparqlData):
        self.ptype = 'generic'
        self.construct = "CONSTRUCT { ?resource info:hasModel 'IRItem'^^xsd:string ; rdf:type pcdm:Object; rdf:type works:work"
        self.where = []
        self.select = "SELECT distinct ?resource WHERE { ?resource info:hasModel 'GenericFile'^^xsd:string ; dcterm:type ?type . filter(?type != 'Thesis'^^xsd:string) }"
        super().__init__(self.ptype, sparqlData)

    def generateQueries(self):
        self.getSplitBy()
        query = "%s %s" % (self.prefixes, self.select)
        for group in self.splitBy.keys():
            where = "WHERE {  ?resource info:hasModel 'GenericFile'^^xsd:string ; dcterm:type ?type . filter(?type != 'Thesis'^^xsd:string) . FILTER (contains(str(?resource), '%s'))" % (self.splitBy[group])
            construct = self.construct
            for pair in self.mapping:
                construct = "%s ; <%s> ?%s" % (construct, pair[0], removeNS(pair[0]))
                where = " %s . OPTIONAL { ?resource <%s> ?%s . FILTER (?%s!='') }" % (where, pair[1], removeNS(pair[0]), removeNS(pair[0]))
            self.queries[group] = "%s %s } %s }" % (self.prefixes, construct, where)
        self.writeQueries()


class Thesis(Query):
    def __init__(self, sparqlData):
        self.ptype = 'thesis'
        self.construct = "CONSTRUCT { ?resource info:hasModel 'IRItem'^^xsd:string ; rdf:type pcdm:Object; rdf:type works:work ; rdf:type bibo:Thesis"
        self.where = []
        self.select = "SELECT distinct ?resource WHERE { ?resource info:hasModel 'GenericFile'^^xsd:string ; dcterm:type 'Thesis'^^xsd:string }"
        super().__init__(self.ptype, sparqlData)

    def generateQueries(self):
        self.getSplitBy()        
        query = "%s %s" % (self.prefixes, self.select)
        for group in self.splitBy.keys():
            where = "WHERE { ?resource info:hasModel 'GenericFile'^^xsd:string ; dcterm:type 'Thesis'^^xsd:string . FILTER (contains(str(?resource), '%s'))" % (self.splitBy[group])
            construct = self.construct
            for pair in self.mapping:
                construct = "%s ; <%s> ?%s" % (construct, pair[0], removeNS(pair[0]))
                where = " %s . OPTIONAL { ?resource <%s> ?%s . FILTER (?%s!='') }" % (where, pair[1], removeNS(pair[0]), removeNS(pair[0]))
                self.queries[group] =  "%s %s } %s  }" % (self.prefixes, construct, where)
        self.writeQueries()


class Batch(Query):
    def __init__(self, sparqlData):
        self.ptype = 'batch'
        self.construct = "CONSTRUCT { ?resource info:hasModel 'Batch' ; schema:result ?type; ?predicate ?object }"
        self.where = "WHERE { ?resource info:hasModel 'Batch'^^xsd:string ; "
        self.select = "SELECT distinct ?resource WHERE  { ?resource info:hasModel 'Batch'^^xsd:string } "
        super().__init__(self.ptype, sparqlData)
    
    def generateQueries(self):
        self.getSplitBy()
        for group in self.splitBy.keys():
            self.queries[group] = "%s %s %s FILTER (contains(str(?resource), '%s')) . ?resource dcterm:type ?type . ?resource ?predicate ?object . FILTER (?object!='') }" % (self.prefixes, self.construct, self.where, self.splitBy[group])
        self.writeQueries()            

##  DATA TRANSPORT OBJECTS
##### Runs a query, sends data to get transformed, saves data to appropriate file

In [180]:
class Data(object):
    def __init__(self, query, group, sparqlData, sparqlTerms, ptype):
        self.q = query
        self.group = group
        self.sparqlData = sparqlData
        self.sparqlTerms = sparqlTerms
        self.output = []
        self.ptype = type
        self.directory = "results/%s/" % (ptype)
        self.filename = "results/%s/%s.nt" % (ptype, group)
        if not os.path.exists(self.directory):
            os.makedirs(self.directory)
        

    def transformData(self):
        self.sparqlData.setMethod("GET")
        self.sparqlData.setReturnFormat(JSON)
        self.sparqlData.setQuery(self.q)
        results = self.sparqlData.query().convert()['results']['bindings']
        for result in results:
            result = TransformationFactory().getTransformation(result, self.ptype)
            if isinstance(result, list):
                for triple in result:
                    s = "<%s>" % (str(triple['subject']['value']))
                    p = "<%s>" % (str(triple['predicate']['value']))
                    if triple['object']['type'] == 'uri':
                        o = "<%s>" % (str(triple['object']['value']))
                    else:
                        o = "\"%s\"" % (str(triple['object']['value']))
                    self.output.append("%s %s %s . \n" % (s, p, o))
        with open(self.filename, "w+") as f:
            f.writelines(self.output)

        #with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor:
        #    future_to_result = {executor.submit(self.processResults, results, query): result for result in results}
        #    for future in concurrent.futures.as_completed(future_to_result):
        #        result = future_to_result[future]
        #        try:
        #            future.result()
        #        except Exception:
        #            PrintException()

class CollectionData(Data):
    def __init__(self, q, group, sparqlData, sparqlTerms, ptype):
        super().__init__(q, group, sparqlData, sparqlTerms, ptype)

class CommunityData(Data):
    def __init__(self, q, group, sparqlData, sparqlTerms, ptype):
        super().__init__(q, group, sparqlData, sparqlTerms, ptype)

class ThesisData(Data):
    def __init__(self, q, group, sparqlData, sparqlTerms, ptype):
        super().__init__(q, group, sparqlData, sparqlTerms, ptype)             

class GenericData(Data):
    def __init__(self, q, group, sparqlData, sparqlTerms, ptype):
        super().__init__(q, group, sparqlData, sparqlTerms, ptype)        

class BatchData(Data):
    def __init__(self, q, group, sparqlData, sparqlTerms, ptype):
        super().__init__(q, group, sparqlData, sparqlTerms, ptype)
            

In [181]:
class QueryFactory():
    @staticmethod
    def getMigrationQuery(ptype, sparqlData):
        """ returns a specified query object depending on the type passed in"""
        if ptype == "collection": return Collection(sparqlData)
        elif ptype == "community": return Community(sparqlData) 
        elif ptype == "thesis": return Thesis(sparqlData)
        elif ptype == "generic": return Generic(sparqlData)
        elif ptype == "batch": return Batch(sparqlData)
        else:
            return None

In [182]:
class DataFactory():
    @staticmethod
    def getData(query, group, queryObject):
        """ returns a specified query object depending on the type passed in"""
        functions = {"collection": CollectionData(query, group, queryObject.sparqlData, queryObject.sparqlTerms, queryObject.ptype), 
                     "community": CommunityData(query, group, queryObject.sparqlData, queryObject.sparqlTerms, queryObject.ptype), 
                     "thesis": ThesisData(query, group, queryObject.sparqlData, queryObject.sparqlTerms, queryObject.ptype), 
                     "generic": GenericData(query, group, queryObject.sparqlData, queryObject.sparqlTerms, queryObject.ptype), 
                     "batch": BatchData(query, group, queryObject.sparqlData, queryObject.sparqlTerms, queryObject.ptype)
                    }
        if queryObject.ptype in functions:
            return functions[queryObject.ptype]
        else:
            return None

In [183]:
class TransformationFactory():
    @staticmethod
    def getTransformation(triple, ptype):
        function = re.sub(r'[0-9]+', '', triple['predicate']['value'].split('/')[-1].replace('#', '').replace('-', ''))
        if function == "rdfsyntaxnstype": return Transformation().rdfsyntaxnstype(triple, ptype)
        elif function == "language": return Transformation().language(triple, ptype)
        elif function == "type": return Transformation().type(triple, ptype)
        elif function ==  "rights": return Transformation().rights(triple, ptype)
        elif function == "license": return Transformation().license(triple, ptype)
        else:
            return [triple]

In [184]:
if __name__ == "__main__":
	main()

TypeError: __init__() missing 1 required positional argument: 'sparqlData'