In [1]:
import prov.model as prov
import numpy as np
import pandas as pd
import uuid
import os
import time
import csv
import json

from prov.dot import prov_to_dot

class Provenance:
    
    # Constants:
    NAMESPACE_FUNC = 'function:'
    NAMESPACE_ENTITY = 'entity:'
    INPUT = 'input'
    OUTPUT = 'output'
    CHUNK_SIZE = 30000
    
    def __init__(self, df, results_path=None, split_json=False):
         # Set input dataframe parameters:
        self.current_m, self.current_n = df.shape
        self.current_columns = df.columns
        self.current_index = df.index
    
        # Create a new provenance document:
        self.current_provDoc = self.create_prov_document()
        
        # Create provenance entities of the input dataframe:
        self.current_ent = self.create_prov_entities(df, self.INPUT)
        
        # Initialize operation number:
        self.operation_number = 0
        self.instance = self.OUTPUT + str(self.operation_number)
        
        # Set results path:
        results_path = 'results/' + time.strftime('%Y%m%d-%H%M%S') if results_path is None else results_path
        self.results_path = results_path
        self.split_json = split_json
        
        # Save input provenance document
        self.save_json_prov(os.path.join(self.results_path, self.INPUT))
        
    def create_prov_document(self):
        """Return a new emplty provenance document."""
        doc = prov.ProvDocument()  # doc is now an empty provenance document
        doc.set_default_namespace('default/')       # Default namespace 
        doc.add_namespace('function', 'function/')  # Add namespace for functions
        doc.add_namespace('entity', 'entity/')      # Add namespace for entities
        # Set doc as current provenance document:
        self.current_provDoc = doc
        return doc
        
    def create_entity(self, ent_id, value, feature_name, instance):
        """Add an entity to the current provenance document.
        Return a dictionary with the id and the attributes of the entity."""
        # Get attributes:
        other_attributes = {}
        other_attributes['value'] = value
        other_attributes['feature name'] = feature_name
        other_attributes['instance'] = instance
        
        # Add entity to current provenance document:
        entity = self.current_provDoc.entity(ent_id, other_attributes)
        
        return {'identifier': ent_id, 'attributes': other_attributes}
    
    def create_activity(self, function_name, features_name=None, other_attributes=None):
        """Add an activity to the current provenance document.
        Return the id of the new prov activity."""
        # Get default activity attributes:
        attributes = {}
        attributes['function name'] = function_name
        if features_name is not None:
            attributes['features name'] =  features_name
        attributes['operation number'] = str(self.operation_number)
        
        # Join default and extra attributes:
        if other_attributes is not None:
            attributes.update(other_attributes)
            
        act_id = self.NAMESPACE_FUNC + str(uuid.uuid4())
        
        # Add activity to current provenance document:
        act_id = self.current_provDoc.activity(act_id, None, None, attributes)
        
        return act_id
        #return act
    
    def add_ent(self, elem, instance, feature_name):
        ent_id = self.NAMESPACE_ENTITY + str(uuid.uuid4())
        return self.create_entity(ent_id, elem, feature_name, instance)
    
    def create_entities(self, dataframe, instance=None):
        """Return a numpy array of new provenance entities related to the dataframe."""
        instance = self.instance if instance is None else instance
        columns = dataframe.columns
        newdf = pd.DataFrame(columns, dtype=object)
        for i in range(self.current_n):
            feature_name = columns[i]
            newdf[feature_name] = dataframe[feature_name].apply(self.add_ent, args=[instance, feature_name])
        return newdf
        
    def create_prov_entities(self, dataframe, instance=None):
        """Return a numpy array of new provenance entities related to the dataframe."""
        instance = self.instance if instance is None else instance
        columns = dataframe.columns
        
        # Copy input values in array
        # values = np.array(dataframe.values)
        # Create a function that adds input entities to prov document
        # createEntities = lambda i,j: self.create_entity(self.NAMESPACE_ENTITY + str(uuid.uuid4()), 
        #                                                  str(values[i][j]), 
        #                                                  columns[j], 
        #                                                  instance)
        # entities = np.fromfunction(np.vectorize(createEntities), values.shape, dtype=object)
        
        # Create output array of entities:
        entities = np.empty(dataframe.shape, dtype=object)
        for i in range(self.current_m):
            for j in range(self.current_n):
                ent_id = self.NAMESPACE_ENTITY + str(uuid.uuid4())
                value = str(dataframe.iat[i, j])
                 # Add entity to current provenance document:
                entities[i][j] = self.create_entity(ent_id, value, columns[j], instance)

        return entities
    
    def set_current_values(self, dataframe, entities_out):
        """Update values of current entities after every operation."""
        # Set output dataframe entities:
        self.current_m, self.current_n = dataframe.shape
        self.current_columns = dataframe.columns
        self.current_index = dataframe.index
        self.current_ent = entities_out
        
        # Increment operation number:
        self.operation_number += 1
        self.instance = self.OUTPUT + str(self.operation_number)
        
    def split_json_file(self, nameFile):
        json_path = nameFile + '.json'
        if not os.path.exists(nameFile):
            os.makedirs(nameFile)
        ents_path = os.path.join(nameFile, 'entities')
        acts_path = os.path.join(nameFile, 'activities.json')
        conn_path = os.path.join(nameFile, 'connections.json')
        with open(json_path) as json_file:
            data = json.load(json_file)
            data.pop('prefix', None) # delete prefix infos
                
            # Save entities:
            entities = data['entity']
            
            for k, v in entities.items():
                v.update({'id':k})
                
            for i in range(0, len(entities), self.CHUNK_SIZE):
                output_name = ents_path + '_' + str(i//self.CHUNK_SIZE) + '.json'
                with open(output_name, 'w', encoding='utf-8') as ents_file:
                    #ents = dict(list(entities.items())[i:i+self.CHUNK_SIZE])
                    ents = list(entities.values())[i:i+self.CHUNK_SIZE]
                    json.dump(ents, ents_file, ensure_ascii=False, indent=4)
            data.pop('entity', None)
            #with open(ents_path, 'w', encoding='utf-8') as ents_file:
                #json.dump(entities, ents_file, ensure_ascii=False, indent=4)
                #data.pop('entity', None)
                    
            # Save activities:
            if 'activity' in data:
                with open(acts_path, 'w', encoding='utf-8') as acts_file:
                    activities = data['activity']
                    for k, v in activities.items():
                        v.update({'id':k})
                    json.dump(list(activities.values()), acts_file, ensure_ascii=False, indent=4)
                    data.pop('activity', None)
                
            # Save all connections:
            if data:
                with open(conn_path, 'w', encoding='utf-8') as conn_file:
                    json.dump(data, conn_file, ensure_ascii=False, indent=4)  
        
    def save_json_prov(self, nameFile):
        """Save provenance in json file."""
        prov_doc = self.current_provDoc
        directory = os.path.dirname(nameFile)
        if not os.path.exists(directory):
            os.makedirs(directory)
        prov_doc.serialize(nameFile + '.json', indent=2)
        
        if self.split_json:
            self.split_json_file(nameFile)
        
    def save_graph(self, nameFile):
        """Save provenance of last operation in png image graph."""
        prov_doc = self.current_provDoc
        dot = prov_to_dot(prov_doc)
        dot.write_png(nameFile + '.png')
        
    def save_all_graph(self, nameFile):
        """Save all provenance in png image graph."""
        directory = os.path.dirname(nameFile)
        final_doc = prov.ProvDocument()
        prov_doc = prov.ProvDocument()
        for file in os.listdir(directory):
            if file.endswith('.json'):
                prov_doc = prov_doc.deserialize(os.path.join(directory, file))
                final_doc.update(prov_doc)
        dot = prov_to_dot(final_doc)
        dot.write_png(nameFile + '.png')
    
    def timing(f):
        def wrap(*args):
            time1 = time.time()
            ret = f(*args)
            time2 = time.time()
            print('{:s} function took {:.3f} ms'.format(f.__name__, (time2-time1)*1000.0))
            
        # Get timing of provenance function:
#         duration = time2 - time1
#         print(f.__name__
#               + ' finished in ' 
#               + time.strftime('%H:%M:%S', time.gmtime(duration)))

            return ret
        return wrap
    
        
    ###
    ###  PROVENANCE METHODS
    ###

    @timing
    def getProv_Binarizer(self, df_out, columnsName, function_name='Binarizer'): 
        """Return provenance document related to binarization function.
        
        Keyword argument:
        df_out -- the output dataframe
        columnsName -- list of binarized columns name
        """
        prov_doc = self.create_prov_document()  # Create a new provenance document
        
        # Get current values:
        entities_in = self.current_ent
        
        # Output values:
        columns_out = df_out.columns
        # entities_out = np.empty(df_out.shape, dtype=object)
                
        for j in range(self.current_n):
            # Create activity for all binarized columns:
            if columns_out[j] in columnsName:
                act_id = self.create_activity(function_name, columns_out[j])
            for i in range(self.current_m):
                e_in = entities_in[i][j]
                e_in_identifier = e_in['identifier']
                value = str(df_out.iat[i, j])
                if columns_out[j] in columnsName:
                    # Create a new entity with new value:
                    ent_id = self.NAMESPACE_ENTITY + str(uuid.uuid4())
                    e_out = self.create_entity(ent_id, value, columns_out[j], self.instance)
                    e_out_identifier = e_out['identifier']
                    
                    prov_doc.wasGeneratedBy(e_out_identifier, act_id)
                    prov_doc.used(act_id, e_in_identifier)
                    prov_doc.wasDerivedFrom(e_out_identifier, e_in_identifier)
                else:
                    # Add new instance to the original entity:
                    ent_id = e_in_identifier
                    e_out = self.create_entity(ent_id, value, columns_out[j], self.instance)
                    
                entities_in[i][j] = e_out
                    
        # Save provenance document in json file:
        self.save_json_prov(os.path.join(self.results_path, str(self.instance)))
        # Update current values:
        self.set_current_values(df_out, entities_in) 
        
        return prov_doc
    
    @timing
    def getProv_FeatureTransformation(self, df_out, columnsName, function_name='Feature Transformation'):
        """Return provenance document related to features trasformation function.
        
        Keyword argument:
        df_out -- the output dataframe
        columnsName -- list of transformed columns name
        """
        prov_doc = self.create_prov_document() # Create new provenance document
        
        # Get current values:
        entities_in = self.current_ent
        
        # Output values:
        columns_out = df_out.columns
        # entities_out = np.empty(df_out.shape, dtype=object)
        
        for j in range(self.current_n):
            # Create activity for all trasformed columns:
            if columns_out[j] in columnsName:
                act_id = self.create_activity(function_name, columns_out[j])
            for i in range(self.current_m):
                e_in = entities_in[i][j]
                e_in_identifier = e_in['identifier']
                value = str(df_out.iat[i, j])
                if columns_out[j] in columnsName:
                    # Create a new entity with new value:
                    ent_id = self.NAMESPACE_ENTITY + str(uuid.uuid4())
                    e_out = self.create_entity(ent_id, value, columns_out[j], self.instance)
                    e_out_identifier = e_out['identifier']
                    
                    prov_doc.wasGeneratedBy(e_out_identifier, act_id)
                    prov_doc.used(act_id, e_in_identifier)
                    prov_doc.wasDerivedFrom(e_out_identifier, e_in_identifier)
                else:
                    # Add new instance to the original entity:
                    ent_id = e_in_identifier
                    e_out = self.create_entity(ent_id, value, columns_out[j], self.instance)
                    
                entities_in[i][j] = e_out
                    

        # Save provenance document in json file:
        self.save_json_prov(os.path.join(self.results_path, self.instance))
        # Update current values:
        self.set_current_values(df_out, entities_in)
        
        return prov_doc

    @timing
    def getProv_SpaceTransformation(self, df_out, columnsName, function_name='Space Transformation'):
        """Return provenance document related to space trasformation function.
        
        Keyword argument:
        df_out -- the output dataframe
        columnsName -- list of columns name joined to create the new column
        """
        prov_doc = self.create_prov_document() # Create new provenance document
        
        # Get current values:
        entities_in = self.current_ent
        m, n = self.current_m, self.current_n
        
        # Get feature indexes used for space transformation:
        indexes = []
        for feature in columnsName:
            indexes.append(df_out.columns.get_loc(feature))

        # Output values:
        m_new, n_new = df_out.shape
        columns_out = df_out.columns
        # Create entities of the output dataframe:
        entities_out = np.empty(df_out.shape, dtype=object)

        # Create space transformation activity:
        act_id = self.create_activity(function_name, ', '.join(columnsName))
        
        # Get provenance related to existent data:
        for i in range(m):
            for j in range(n):
                value = str(df_out.iat[i, j])
                e_in = entities_in[i][j]
                ent_id = e_in['identifier']
                e_out = self.create_entity(ent_id, value, columns_out[j], self.instance)
                entities_out[i][j] = e_out
                if j in indexes:
                    prov_doc.used(act_id, ent_id)
                    
        # Get provenance related to the new column:
        for i in range(m):
            for j in range(n, n_new):
                value = str(df_out.iat[i, j])
                ent_id = self.NAMESPACE_ENTITY + str(uuid.uuid4())
                e_out = self.create_entity(ent_id, value, columns_out[j], self.instance)
                e_out_identifier = e_out['identifier']
                entities_out[i][j] = e_out
                prov_doc.wasGeneratedBy(e_out_identifier, act_id)
                for index in indexes:
                    e_in = entities_in[i][index]
                    e_in_identifier = e_in['identifier']
                    prov_doc.wasDerivedFrom(e_out_identifier, e_in_identifier)
                
                
        # Save provenance document in json file:
        self.save_json_prov(os.path.join(self.results_path, self.instance))
        # Update current values:
        self.set_current_values(df_out, entities_out)

        return prov_doc

    @timing
    def getProv_FeatureSelection(self, df_out, function_name='Feature Selection'):
        """Return provenance document related to feature selection function."""
        prov_doc = self.create_prov_document()  # Create new provenance document
        
        # Get current values:
        entities_in = self.current_ent
        columns_in = self.current_columns
        m, n = self.current_m, self.current_n
        
        # Output values:
        columns_out = df_out.columns
        m_new, n_new = df_out.shape
        # Create entities of the output dataframe:
        entities_out = np.empty(df_out.shape, dtype=object)

        columnsName = set(columns_in) - set(columns_out)  # List of selected columns
        
        # Create feature selection activity:
        act_id = self.create_activity(function_name, ', '.join(columnsName))
        
        for i in range(m):
            for j in range(n):
                new_column_index = columns_out.get_loc(columns_in[j]) if columns_in[j] in columns_out else -1
                e_in = entities_in[i][j]
                e_in_identifier = e_in['identifier']
                if new_column_index == -1:
                    prov_doc.wasInvalidatedBy(e_in_identifier, act_id)
                else:
                    value = str(df_out.iat[i, new_column_index])
                    e_out = self.create_entity(e_in_identifier, value, columns_out[new_column_index], self.instance)
                    entities_out[i][new_column_index] = e_out
                    
        # Save provenance document in json file:
        self.save_json_prov(os.path.join(self.results_path, self.instance))
        # Update current values:
        self.set_current_values(df_out, entities_out)

        return prov_doc
    
    @timing
    def getProv_Drop(self, df_out, function_name='Drop'):
        """Return provenance document related to drop function."""
        prov_doc = self.create_prov_document() # Create new provenance document
        
        # Get current values:
        entities_in = self.current_ent
        columns_in = self.current_columns
        index_in = self.current_index
        m, n = self.current_m, self.current_n
        
        # Output values:
        columns_out = df_out.columns
        index_out = df_out.index
        m_new, n_new = df_out.shape
        # Create entities of the output dataframe:
        entities_out = np.empty(df_out.shape, dtype=object)
        
        columnsName = set(columns_in) - set(columns_out) # List of selected columns
        
        # Create drop activity:
        act_id = self.create_activity(function_name, ', '.join(columnsName))
        
        for i in range(m):
            new_row_index = index_out.get_loc(index_in[i]) if index_in[i] in index_out else -1
            for j in range(n):
                new_column_index = columns_out.get_loc(columns_in[j])  if columns_in[j] in columns_out else -1
                e_in = entities_in[i][j]
                e_in_identifier = e_in['identifier']
                if new_row_index == -1 or new_column_index == -1:
                    prov_doc.wasInvalidatedBy(e_in_identifier, act_id)
                else:
                    value = str(df_out.iat[new_row_index, new_column_index])
                    e_out = self.create_entity(e_in_identifier, value, columns_out[new_column_index], self.instance)
                    entities_out[new_row_index][new_column_index] = e_out
                    
        # Save provenance document in json file:
        self.save_json_prov(os.path.join(self.results_path, self.instance))
        # Update current values:
        self.set_current_values(df_out, entities_out)

        return prov_doc

    @timing
    def getProv_InstanceGeneration(self, df_out, function_name='Instance Generation'):
        """Return provenance document related to instance generation function."""
        prov_doc = self.create_prov_document() # Create new provenance document
        
        # Get current values:
        entities_in = self.current_ent
        m, n = self.current_m, self.current_n
        
        # Output values:
        columns_out = df_out.columns
        m_new, n_new = df_out.shape
        # Create entities of the output dataframe:
        entities_out = np.empty(df_out.shape, dtype=object)
        
        for j in range(n_new):
            # Create function for every columns
            act_id = self.create_activity(function_name, columns_out[j])
            for i in range(m_new):
                value = str(df_out.iat[i, j])
                if i < m:
                    # Provenance of existent data
                    e_in = entities_in[i][j]
                    ent_id = e_in['identifier']
                    e_out = self.create_entity(ent_id, value, columns_out[j], self.instance)
                    entities_out[i][j] = e_out
                    prov_doc.used(act_id, ent_id)
                else:
                    # Provenance of new data
                    ent_id = self.NAMESPACE_ENTITY + str(uuid.uuid4())
                    e_out = self.create_entity(ent_id, value, columns_out[j], self.instance)
                    e_out_identifier = e_out['identifier']
                    entities_out[i][j] = e_out
                    prov_doc.wasGeneratedBy(e_out_identifier, act_id)

        # Save provenance document in json file:
        self.save_json_prov(os.path.join(self.results_path, self.instance))
        # Update current values:
        self.set_current_values(df_out, entities_out)

        return prov_doc

    @timing
    def getProv_OneHotEncode(self, df_out, onehot_cols, onehot_cols_map, function_name='OneHot Encoding'):
        """Return provenance document related to one-hot encoding function.
        
        Keyword argument:
        df_out -- the output dataframe
        onehot_cols -- list of One-Hot encoded columns 
        onehot_cols_map -- map(key, values)
                           where key is the One-Hot encoded column name
                           and values is an array of the new columns name
        """
        prov_doc = self.create_prov_document() # Create new provenance document
        
        # Get current values:
        entities_in = self.current_ent
        columns_in = self.current_columns

        # Output values:
        m_new, n_new = df_out.shape
        columns_out = df_out.columns
        # Create entities of the output dataframe:
        entities_out = np.empty(df_out.shape, dtype=object)
        
        activities_dict = {}

        # Get One-Hot provenance:
        for j in range(n_new):
            column_out_name = columns_out[j]
            new_column_index = columns_in.get_loc(column_out_name) if column_out_name in columns_in else -1
            # Create functions (one for all one hot encoded feature)
            if j < self.current_n and columns_in[j] in onehot_cols:
                    act_id = self.create_activity(function_name, columns_in[j])
                    activities_dict[columns_in[j]] = act_id
                    
            for i in range(m_new):
                value = str(df_out.iat[i, j])
                # Unchanged output:
                if column_out_name in columns_in:
                    e_in = entities_in[i][new_column_index]
                    ent_id = e_in['identifier']
                    e_out = self.create_entity(ent_id, value, column_out_name, self.instance)
                # New data:
                else:
                    for k,v in onehot_cols_map.items():
                        if column_out_name in v:
                            column_name = k
                    
                    ent_id = self.NAMESPACE_ENTITY + str(uuid.uuid4())
                    e_out = self.create_entity(ent_id, value, column_out_name, self.instance)
                    e_out_identifier = e_out['identifier']
                    activity = activities_dict[column_name]
                    prov_doc.wasGeneratedBy(e_out_identifier, activity)

                # Add input entities used by functions:
                if j < self.current_n and columns_in[j] in onehot_cols:
                    e_in = entities_in[i][j]
                    e_in_identifier = e_in['identifier']
                    prov_doc.used(act_id, e_in_identifier)
                    
                entities_out[i][j] = e_out

        # Save provenance document in json file:
        self.save_json_prov(os.path.join(self.results_path, self.instance))
        # Update current values:
        self.set_current_values(df_out, entities_out)

        return prov_doc
    
    @timing
    def getProv_ValueTransformation(self, df_out, value, function_name='Value Transformation'):
        """Return provenance document related to value transformation function.
        Used when a value inside the dataframe is replaced.
        
        Keyword argument:
        df_out -- the output dataframe
        value -- replaced value
        """
        prov_doc = self.create_prov_document() # Create new provenance document
        
        # Get current values:
        entities_in = self.current_ent

        # Output values:
        columns_out = df_out.columns
        # Create entities of the output dataframe:
        # entities_out = np.empty(df_out.shape, dtype=object)
        
        # Create value transformation activity:
        act_id = self.create_activity(function_name)
        
        for i in range(self.current_m):
            for j in range(self.current_n):
                value = str(df_out.iat[i, j])
                
                e_in = entities_in[i][j]
                e_in_identifier = e_in['identifier']
                val_in = e_in['attributes']['value']
                
                # Check if the input value is the replaced value
                if str(val_in) == str(value):
                    # Add new instance to the original entity:
                    ent_id = e_in['identifier']
                    e_out = self.create_entity(ent_id, value, columns_out[j], self.instance)
                else:
                    # Create new entity with the new value
                    ent_id = self.NAMESPACE_ENTITY + str(uuid.uuid4())
                    e_out = self.create_entity(ent_id, value, columns_out[j], self.instance)
                    e_out_identifier = e_out['identifier']
                    prov_doc.wasDerivedFrom(e_out_identifier, e_in_identifier)
                    prov_doc.wasGeneratedBy(e_out_identifier, act_id)
                    prov_doc.used(act_id, e_in_identifier)
                    
                entities_in[i][j] = e_out

        # Save provenance document in json file:
        self.save_json_prov(os.path.join(self.results_path, self.instance))
        # Update current values:
        self.set_current_values(df_out, entities_in)

        return prov_doc
    
    # TODO: imputation relativo alla singola colonna o a tutte insieme?
    #       Puo comprendere un sottoinsieme di colonne o tutte insieme?
    @timing
    def getProv_Imputation(self, df_out, isSingleAct=True, function_name='Imputation'):
        """Return provenance document related to imputation function."""
        prov_doc = self.create_prov_document() # Create new provenance document
        
        # Get current values:
        entities_in = self.current_ent
        # Output values:
        columns_out = df_out.columns
        # Create entities of the output dataframe:
        # entities_out = np.empty(df_out.shape, dtype=object)
        
        # Create a single imputation activity if the imputation is related to a single column:
        if isSingleAct:
            act_id = self.create_activity(function_name) 
            
        for j in range(self.current_n):
            if not isSingleAct:
                act_id = self.create_activity(function_name) 
            for i in range(self.current_m):
                value = str(df_out.iat[i, j])
                
                e_in = entities_in[i][j]
                e_in_identifier = e_in['identifier']
                val_in = e_in['attributes']['value']
                
                if val_in == 'nan':
                    # Create new entity with the new value
                    ent_id = self.NAMESPACE_ENTITY + str(uuid.uuid4())
                    e_out = self.create_entity(ent_id, value, columns_out[j], self.instance)
                    e_out_identifier = e_out['identifier']
                    prov_doc.wasGeneratedBy(e_out_identifier, act_id)
                    prov_doc.used(act_id, e_in_identifier)
                    prov_doc.wasDerivedFrom(e_out_identifier, e_in_identifier)
                else:
                    # Add new instance to the original entity:
                    ent_id = e_in['identifier']
                    e_out = self.create_entity(ent_id, value, columns_out[j], self.instance)
                
                entities_in[i][j] = e_out
                    

        # Save provenance document in json file:
        self.save_json_prov(os.path.join(self.results_path, self.instance))
        # Update current values:
        self.set_current_values(df_out, entities_in)

        return prov_doc