In [66]:
!pip install rdflib > /dev/null
!pip install owlrl > /dev/null
!pip install pyshacl > /dev/null
!pip install autopep8 > /dev/null


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --u

In [67]:
from rdflib import *
from owlrl import *
from jinja2 import Template
import types
import pyshacl
import functools
import inspect
import autopep8
import json

In [68]:
tbox = Namespace('http://www.semanticweb.org/acraf/ontologies/2024/healthmesh/tbox#')
abox = Namespace('http://www.semanticweb.org/acraf/ontologies/2024/healthmesh/abox#')
dcat = Namespace('https://www.w3.org/ns/dcat#')
dcterms = Namespace('http://purl.org/dc/terms/')
tb = Namespace("http://www.semanticweb.org/acraf/ontologies/2021/0/SDM#")
odrl = Namespace("http://www.w3.org/ns/odrl/2/")

In [69]:
def add_jsonld_instances(graph, path):
    # Adds JSON-LD instances to the graph
    with open(path, 'r') as f:
        json_ld_data = json.loads(f.read())
        instances = Graph().parse(data=json_ld_data, format='json-ld')
        graph += instances
    
    return graph

# POLICYCHECKER TRANSLATOR

Given a Data Product, Output a DC IRs

In [70]:
class Implementation():
    """
    Operation Class
    """
    
    def __init__(self, operation, data_flow=[]):
        
        # Static Metadata
        tg = Graph()
        tg.bind("tb","http://www.semanticweb.org/acraf/ontologies/2024/healthmesh/tbox#")
        tg.bind("ab","http://www.semanticweb.org/acraf/ontologies/2024/healthmesh/abox#")
        self.tg = add_jsonld_instances(tg, "translations_mappings.json")
        
        # Function Implementaiton
        self.f = self._init_func(operation, data_flow)
        
        
    def _init_func(self, operation, data_flow = None):
        """
        Get the code associated with the operation
        :param operation: operation
        :return: python funcion 
        """
        translation = self.tg.value(subject=operation, predicate=tbox.hasTranslation)   
        code = self.tg.value(subject=translation, predicate=tbox.hasCode)
        params = self.tg.objects(subject=translation, predicate=tbox.hasParameters)
        dependencies = self.tg.objects(subject=translation, predicate=tbox.dependsOn)
        params = [self.tg.value(subject=p, predicate=tbox.name) for p in params]
        dependencies = [self.tg.value(subject=d, predicate=tbox.name) for d in dependencies]
                
        c_path = self.tg.value(subject=code, predicate=tbox.code)
        
        codelines = str(c_path).split('\n')
        
        # Implementation Template
        template = Template("""
        def {{ name }}({{",".join(params)}}):
        {% for library in libraries %}
            import {{ library }}
        {% endfor %}
        {% for line in codelines %}
            data = {{  line }}
        {% endfor %}
            return data
        """)
    
        # Render template
        try:
            rendered_code = template.render(name=translation.split("#")[1], codelines=codelines, params=params, libraries=dependencies)
            fixed_code = autopep8.fix_code(rendered_code) 
        except Exception as e:
            print("Code Generation Error: ", e)
        # Compile the code
        try:
            compiled_code = compile(fixed_code, '', 'exec')
            # Create a new function from the compiled code
            new_func = types.FunctionType(compiled_code.co_consts[0], globals(), translation)
        except Exception as e:
            print("Code Compilation Error: ", e)
        
        return new_func
    
    
    def get_func(self):
        return self.f

    
    

In [80]:
class PCTranslator:
    """
    Parse the PolicyChecker Operations to an executable Script
    """

    def __init__(self, pc):
        # PolicyChecker
        self.g = pc
        # Execution Graph
        self.eg = Graph()
        
        
    def _validate_graph(self) -> bool:
        """
        Validate the policies grammar is compliant with the grammar defined
        :return: conformance/non-conformance
        """
        from pyshacl import validate
        shapes = Graph().parse("./pc_grammar.ttl", format="turtle")
        conforms, report_graph, report_text = validate(self.g,shacl_graph=shapes)
        #return boolean
        return conforms
         
    def _create_function_as_decorator(self, before_func=None, after_func=None, before_args={}, after_args={}) -> types.FunctionType:
        def decorator(f):
            def wrapper(*args, **kwargs):
                if before_func is not None:
                    before_func(*before_args)
                data = f(*args, **kwargs)
                if after_func is not None:
                    result = after_func(data=data, **after_args)
                    return result
                return data
            return wrapper
        return decorator
    
    
    def _get_func_parameters(self, f):
        import inspect 
        return inspect.signature(f).parameters
    

    def _operation_to_code(self, operation) -> types.FunctionType:
        
        operation_type = str(self.g.value(subject=operation, predicate=RDF.type)).split("#")[-1]
           
        if operation_type == "InitOperation":
            imp = Implementation(operation)
            return imp.get_func()
        elif operation_type == "Operation":
            imp = Implementation(operation)
            params = self._get_func_parameters(imp.get_func())
            parameters = self.g.objects(subject=operation, predicate=tbox.hasParameter)
            parameters = [str(p).split("#")[1] for p in parameters]
            kwargs = {}
            
            # TODO: PARAMTERER MATCHING ALGORITHM
            for param in params:
                if param != "data":
                    kwargs[param] = parameters.pop(0)
            
            
            decorated_imp  = self._create_function_as_decorator(after_func=imp.get_func(), after_args=kwargs)
            return decorated_imp
        
   
    def traverse_and_generate(self, language="python") -> types.FunctionType:
        """
        Traverse the policy checker and generate the code
        :param language: 
        :return: 
        """
        # Obtian policy checker
        pc = self.g.value(predicate=RDF.type, object=tbox.PolicyChecker)
        
        # Get first operation
        operation = self.g.value(subject=pc,predicate=tbox.nextStep)
        function = None
        
        # Traverse the policy checker (path)
        while operation:
            operation_type = str(self.g.value(subject=operation, predicate=RDF.type)).split("#")[-1]
            if operation_type == "InitOperation":
                function = self._operation_to_code(operation)
            elif operation_type == "Operation":
                decorator = self._operation_to_code(operation)
                function = decorator(function)            
            operation = self.g.value(subject=operation,predicate=tbox.nextStep)
            
        return function
        
    def translate(self) -> types.FunctionType: 
        """
        Get the policies associated with a data product
        :return: list of policies
        """
        # validate policies
        
        if self._validate_graph() == True:
            # get policies
            udf = self.traverse_and_generate()
            return udf
        else:
            raise Exception("Policy Checker is not compliant with the grammar")
        

# Translate the Policy Checker

In [79]:
# Translate the Policy Checker 
pc = Graph().parse("/home/acraf/psr/tfm/Prototype/SideCar/ValidateContract/parser/pc_p1.ttl", format="turtle")
function = PCTranslator(pc).translate()
function

<function __main__.PCTranslator._create_function_as_decorator.<locals>.decorator.<locals>.wrapper(*args, **kwargs)>

In [77]:

function('/home/acraf/psr/tfm/Prototype/DataProduct/Data/Explotation/UPENN-GBM_clinical_info_v2.1.csv')

False