# From Datanode graphs to Data Journeys

In [1]:
# Parameters
indir = "./rdf"
outdir = "./datajourneys"
didir = "./graphs"
classifier = "MLPClassifier_2_1000_rdf2vec.clf"# "LogisticRegression_2_200_rdf2vec.clf"#"MLPClassifier_2_200_rdf2vec.clf"
rdfgraph = "rdfgraph_2_1000_rdf2vec.ttl" #"rdfgraph_2_200_rdf2vec.ttl"
notebook = 'eda-feature-engineering-lgb-xgb-cat' # 'random-forests' # 'very-simple-pytorch-training-0-59' #'transfer-learning' #'very-simple-pytorch-training-0-59' #'1-quick-start-read-csv-and-flatten-json-fields' 
rdf_folder = './rdf'
clf_folder = './models'
use_rules = True
djo = "http://purl.org/datajourneys/"

In [2]:
# notebook = notebook[:-6]
# print(notebook)

In [3]:
import pandas as pd
import numpy as np
import csv
import json

from os import listdir
from os.path import isfile, join
#from torch.utils.tensorboard import SummaryWriter
import datajourney as DJ
import rdflib
from rdflib.namespace import RDF, RDFS
from rdflib import Namespace
from rdflib import URIRef, BNode, Literal

import networkx.drawing, networkx.drawing.nx_agraph as ag

from pyrdf2vec import RDF2VecTransformer
from pyrdf2vec.embedders import Word2Vec
from pyrdf2vec.graphs import KG, Vertex
from pyrdf2vec.walkers import RandomWalker, Walker

# import pygraphviz
from graphviz import Source

from sklearn.naive_bayes import GaussianNB
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.svm import SVC, LinearSVC
from sklearn.neural_network import MLPClassifier
from sklearn.ensemble import RandomForestClassifier

from time import time, monotonic

import pickle

In [4]:
def create_kg(graph, label_predicates):
    kg = KG(location='emptygraph.ttl')
    for (s, p, o) in graph:
        if p not in label_predicates:
            s_v = Vertex(str(s))
            o_v = Vertex(str(o))
            p_v = Vertex(str(p), predicate=True, vprev=s_v, vnext=o_v)
            kg.add_vertex(s_v)
            kg.add_vertex(p_v)
            kg.add_vertex(o_v)
            kg.add_edge(s_v, p_v)
            kg.add_edge(p_v, o_v)
    return kg

In [5]:
def loadNotebooks(indir):
    # Load general graph and add notebook
    rdf_graph = rdflib.Graph()
    rdf_graph.parse(clf_folder + "/" + rdfgraph, format="ttl")
    return rdf_graph

def loadKG(rdf_graph):
    # rdf_graph.parse(indir + '/' + notebook + '.ttl' , format="ttl")
    # Remove rdfs labels before creating the KG
    label_predicates = ["http://www.w3.org/2000/01/rdf-schema#label"] #skip labels
    kg = create_kg(rdf_graph, label_predicates)
    return kg

def loadNotebook(indir, kg, notebook, use_rules):
    # Load RDF notebook
    rdf_nb = rdflib.Graph()
    rdf_nb.parse(indir + '/' + notebook + '.ttl' , format="ttl")
    label_predicates = ["http://www.w3.org/2000/01/rdf-schema#label"] #skip labels
    for (s, p, o) in rdf_nb:
        if p not in label_predicates:
            s_v = Vertex(str(s))
            o_v = Vertex(str(o))
            p_v = Vertex(str(p), predicate=True, vprev=s_v, vnext=o_v)
            kg.add_vertex(s_v)
            kg.add_vertex(p_v)
            kg.add_vertex(o_v)
            kg.add_edge(s_v, p_v)
            kg.add_edge(p_v, o_v)
    # Construct activity annotation with rules
    if use_rules:
        with open('activity_rules.json') as json_file:
            data = json.load(json_file)
            for q in data:
                #print('Query: ' + q)
                for t in rdf_nb.query(q):
                    #print("Adding", t)
                    rdf_nb.add(t)
    return (kg, rdf_nb)

In [6]:

# Select nodes
def selectEntitiesPredict(rdf_nb,kg):
    djo = "http://purl.org/datajourneys/"
    entities = []
    for s, p, o in rdf_nb.triples((None, None, None)):
        # check if has already an activity
        a = rdf_nb.value(s, URIRef(djo + "hasActivity"))
        if a == None:
            entities.append(s)
        if (type(o).__name__ == 'URIRef' and p != RDF.type and p != URIRef(djo + "hasActivity")):
            a = rdf_nb.value(o, URIRef(djo + "hasActivity"))
            if a == None:
                entities.append(o)
    entities = list(set(entities))
    # Remove the root
    root = URIRef('http://purl.org/dj/kaggle/' + notebook)
    # typenb = URIRef('http://purl.org/dj/kaggle/Notebook')
    entities.remove(root)
    # entities.remove(typenb)
    # Build embeddings
    random_walker = RandomWalker(10, 100) #float('inf')
    transformer = RDF2VecTransformer(Word2Vec(sg=0),walkers=[random_walker])
    predict = transformer.fit_transform(kg, entities)
    return (entities, predict)



In [7]:
# Classify
def classify(clf, predict, entities):
    prediction = clf.predict(predict)
    activities = list(zip(entities,prediction))
    dn_annotated = rdf_nb
    djo = "http://purl.org/datajourneys/"
    for (entity, activity) in activities:
        dn_annotated.add((entity, URIRef(djo + "hasActivity") , URIRef(djo + activity[1:])))
    return dn_annotated



In [8]:

# for(s,p,o) in dn_annotated:
#     print(s,p,o)

In [9]:

def n2l(n, g):
    if n == None:
        return "None"
    na = getActivity(n, g)
    delim = '/'
    if '#' in str(n):
        delim = '#'
    #print(delim, n, g.label(n), na)
    nal = ""
    nl = g.label(n)
    if na != None:
        nal = " (" + g.label(n) + ": " + str(na).rsplit('/', 1)[1] + ")"
    elif nl != None:
        nal = " (" + nl + ")"
    return n.rsplit(delim, 1)[1] + nal

def getActivity(thisNode, graph):
    if(thisNode == None):
        return None
    REUSE = URIRef("http://purl.org/datajourneys/Reuse")
    MOVEMENT = URIRef("http://purl.org/datajourneys/Movement")
    ANALYSIS = URIRef("http://purl.org/datajourneys/Analysis")
    VISUALISATION = URIRef("http://purl.org/datajourneys/Visualisation")
    PREPARATION = URIRef("http://purl.org/datajourneys/Preparation")    
    motifs = [REUSE,ANALYSIS,MOVEMENT,VISUALISATION,PREPARATION]
    activities = graph.objects(thisNode, URIRef(djo + "hasActivity"))
    for activity in activities:
        if activity in motifs:
            return activity
    return None

# def getSieblings(thisNode, graph):
#     sieblings = []
#     thisNodeActivity = getActivity(thisNode, graph)
#     for prev in graph.subjects(None, thisNode):
#         for (predicate, siebling) in graph.predicate_objects(thisNode):
#             if predicate == "http://purl.org/datajourneys/previousActivity" and siebling != thisNode:
#                 sieblingActivity = getActivity(siebling, graph)
#                 if(thisNodeActivity == sieblingActivity):
#                     sieblings.add(siebling)
#     return sieblings



In [10]:

def walkActivities(lastActivityNode, thisNode, graph, d, visited):
    if thisNode in visited:
        path = ""
        for v in visited:
            path = "".join([path , "/" , n2l(v, graph)])
        print(path + n2l(thisNode, graph) + "[" + str(d) + "]")
        return graph
    
    visited.append(thisNode)
    #print(n2l(lastActivityNode, graph), " >>> ", n2l(thisNode, graph), " - ", d)
    d=d+1
    #print("Walking %s (%s)" % (thisNode, d))
    thisActivity = getActivity(thisNode, graph)
    lastActivity = getActivity(lastActivityNode, graph)
    nextNodes = graph.subjects(None, thisNode)
    #     for n in nextNodes:
    #         print(">",n2l(thisNode, graph), ">>", n2l(n,graph))
    #     print("---")
    if(thisActivity == None):
        # root here
        for n in graph.subjects(None, thisNode):
            graph.add((n, URIRef("http://purl.org/datajourneys/previousActivity"), thisNode))
            graph = walkActivities(thisNode, n, graph, d, visited.copy())
    elif(thisActivity == lastActivity):
        for n in graph.subjects(None, thisNode):
            graph = walkActivities(lastActivityNode, n, graph, d, visited.copy())
    else:
        if(lastActivity != None):
            graph.add((thisNode, URIRef("http://purl.org/datajourneys/previousActivity"), lastActivityNode))
        for n in graph.subjects(None, thisNode):
            graph = walkActivities(thisNode, n, graph, d, visited.copy())
    return graph


In [18]:
def buildActivityGraph(lastActivity, lastActivityNode, thisNode, graph, d, visited):
    #print("Visiting: ", graph.label(thisNode))
    if thisNode in visited:
        #print("Node is visited")
        path = ""
        for v in visited:
            path = "".join([path , "/" , n2l(v, graph)])
        #print(path + n2l(thisNode, graph) + "[" + str(d) + "]")
        return graph
    visited.append(thisNode)
    d=d+1
    #print("Walking %s (%s)" % (thisNode, d))
    #sieblings = getSieblings(thisNode, graph)
    if(lastActivityNode == None):
        # root here
        #print("Node is root")
        lastActivityNode = thisNode
    else:
        #print("add link to last activity node: ", graph.label(thisNode), graph.label(lastActivityNode))
        graph.add((thisNode, URIRef("http://purl.org/datajourneys/inActivity"), lastActivityNode))
    # collect next data nodes
    nextActivityNodes = {} #URIRef("http://purl.org/datajourneys/appearsIn"),
    skipproperties = [ URIRef("http://purl.org/datajourneys/previousActivity"), URIRef("http://purl.org/datajourneys/inActivity"), RDF.type, RDFS.label, URIRef("http://purl.org/datajourneys/hasActivity") ]
    for n, p in graph.subject_predicates(thisNode):
        if p not in skipproperties:
            #print(" - < property < node", p, n)
            a = getActivity(n, graph)
            if a not in nextActivityNodes:
                nextActivityNodes[a] = []
            nextActivityNodes[a].append(n)
    #print("Next datanodes:", nextActivityNodes)
    # create an entity for each activity
    for a in nextActivityNodes:
        #print("Checking activity: ", a)
        nns = nextActivityNodes.get(a)
        #print("Nodes in activity ", len(nns), nns)
        # If not same activity
        if(lastActivity != a):
            #print("Not the same activity: ", a, lastActivity)
            currentActivity = a
            # Verify if the nodes involved already are linked to an activity, if yes, reuse that instead of creating a new one
            foundActivityNode = None
            for ns in nns:
                foundActivityNode = graph.value(ns, URIRef("http://purl.org/datajourneys/inActivity"))
            if foundActivityNode != None:
                #print("Found activity in nodeset:", foundActivityNode)
                aent = foundActivityNode
            else:
                ahash = hash(tuple(nns))
                #print("Generating activity node: ", str(ahash))
                aent = URIRef('http://purl.org/dj/kaggle/' + notebook + "/activity/" + str(ahash))
                graph.add((aent, RDF.type, currentActivity))
                graph.add((aent, RDFS.label, Literal(str(currentActivity)[29:])))
            # Link activity to previous
            if(lastActivityNode != None):
                graph.add((aent, URIRef("http://purl.org/datajourneys/previousActivity"), lastActivityNode))
        else:
            #print("Same activity: ", lastActivity, lastActivityNode)
            aent = lastActivityNode
            currentActivity = lastActivity
        # Add links to datanodes
        for n in nns:
            #print("Graph2: ", graph)
            # Walk the dataode path
            #print("Traverse data node",n)
            #print(" - activity",currentActivity)
            #print(" - activity node",aent)
            #print(" - depth",d)
            graph = buildActivityGraph(currentActivity, aent, n, graph, d, visited.copy())
    return graph

# def generateDataJourney(dn_annotated, notebook):
#     datajourney = dn_annotated
#     #http://purl.org/dj/kaggle/very-simple-pytorch-training-0-59#525206600 http://purl.org/dj/appearsIn http://purl.org/dj/kaggle/very-simple-pytorch-training-0-59
#     #print(getActivity(URIRef("http://purl.org/dj/kaggle/very-simple-pytorch-training-0-59#525206600"), dn_annotated))
#     rootNode = URIRef('http://purl.org/dj/kaggle/' + notebook)
#     datajourney = walkActivities(None, rootNode, datajourney, 0, [])
#     return datajourney

def generateDataJourney(dn_annotated, notebook):
    datajourney = dn_annotated
    #http://purl.org/dj/kaggle/very-simple-pytorch-training-0-59#525206600 http://purl.org/dj/appearsIn http://purl.org/dj/kaggle/very-simple-pytorch-training-0-59
    #print(getActivity(URIRef("http://purl.org/dj/kaggle/very-simple-pytorch-training-0-59#525206600"), dn_annotated))
    rootNode = URIRef('http://purl.org/dj/kaggle/' + notebook)
    datajourney = buildActivityGraph(None, None, rootNode, datajourney, 0, [])
    return datajourney


In [19]:
# djg = datajourney.triples((None, URIRef("http://purl.org/datajourneys/previousActivity"), None))
# for (s,p,o) in djg:
#     sa = getActivity(s, datajourney)
#     oa = getActivity(o, datajourney)
#     print(s,sa,o,oa)

In [20]:

def buildDjDiGraph(g):
    gd = networkx.DiGraph()
    for (s,p,o) in g.triples((None, URIRef("http://purl.org/datajourneys/previousActivity"), None)):
        gd.add_node(s,label=n2l(s,g))
        gd.add_node(o,label=n2l(o,g))
        gd.add_edge(s,o,label="previous")
    return gd

def buildDnDigraph(g):
    gd = networkx.DiGraph()
    for (s,p,o) in g.triples((None, None, None)):
        if type(o) == rdflib.term.URIRef and p != RDF.type and p != URIRef(djo + "hasActivity") and p != URIRef(djo + "inActivity") and p != URIRef(djo + "previousActivity"):
            gd.add_node(s,label=n2l(s,g))
            gd.add_node(o,label=n2l(o,g))
            gd.add_edge(s,o,label=n2l(p, g))
    return gd
    
def saveDjDiGraph(fname, gd):
    tag = ag.to_agraph(gd)
    o = open(outdir + "/" + fname + "_DJ.digraph", "w")
    o.write(tag.string())
    
def saveDnDiGraph(fname, gd):
    tag = ag.to_agraph(gd)
    o = open(outdir + "/" + fname + "_DN.digraph", "w")
    o.write(tag.string())

def buildDjPNG(fname):
    src = Source.from_file(outdir + "/" + fname + "_DJ.digraph")
    src.format = "png"
    src.render(outdir + "/" + fname + "_DJ")

def buildDnPNG(fname):
    src = Source.from_file(outdir + "/" + fname + "_DN.digraph")
    src.format = "png"
    src.render(outdir + "/" + fname + "_DN")

def buildDjSVG(fname):
    src = Source.from_file(outdir + "/" + fname + "_DJ.digraph")
    src.format = "svg"
    src.render(outdir + "/" + fname + "_DJ")

def buildDnSVG(fname):
    src = Source.from_file(outdir + "/" + fname + "_DN.digraph")
    src.format = "svg"
    src.render(outdir + "/" + fname + "_DN")

def buildOriginalPNG(fname):
    src = Source.from_file(didir + "/" + fname + ".digraph")
    src.format = "png"
    src.render(outdir + "/" + fname)
    
def saveTurtle(fname, datajourney):
    datajourney.serialize(destination=outdir + '/' + fname + '.ttl', format='turtle')







In [21]:
def buildFiles(notebook, rdf_nb, datajourney):
    dn = buildDnDigraph(rdf_nb)
    saveDnDiGraph(notebook, dn)
    buildDnPNG(notebook)
    buildDnSVG(notebook)
    
    gd = buildDjDiGraph(datajourney)
    saveDjDiGraph(notebook, gd)
    buildDjPNG(notebook)
    buildDjSVG(notebook)
    buildOriginalPNG(notebook)
    
    saveTurtle(notebook, datajourney)


In [22]:
# Load the classifier
print("Loading classifier")
t1 = monotonic()
clf = pickle.load(open(clf_folder + '/' + classifier, 'rb'))
print("Loading classifier [Done %fs]" % (monotonic()-t1))
# The process alltogether
print("Loading Notebooks")
t1 = monotonic()
rdf_graph = loadNotebooks(indir)
print("Loading Notebooks [Done %f s]" % (monotonic()-t1))



Loading classifier
Loading classifier [Done 0.001150s]
Loading Notebooks
Loading Notebooks [Done 53.001183 s]


In [23]:
# Prepare list of notebooks to process
# notebook = 'test'
# notebooks = []
# if notebook == '*':
#     import glob
#     for f in glob.glob(indir + "/*"):
# #         print(f[6:-4])
#         notebooks += f
# elif type(notebook) == list:
#     notebooks = notebook
# else:
#     notebooks = [notebook]

#print(notebooks)

In [24]:
# For each notebook TODO
#for notebook in notebooks:
# notebook = notebooks[0]
# ...
print("%s : Building KG" % notebook)
t1 = monotonic()
kg = loadKG(rdf_graph)
print("%s : Building KG [Done %f s]" % (notebook, (monotonic()-t1)))
#

In [24]:
print("%s : Loading notebook and add it to the KG" % notebook)
t1 = monotonic()
kg,rdf_nb = loadNotebook(indir, kg, notebook, use_rules)
print("%s : Loading notebook and add it to the KG [Done %f s]" % (notebook,(monotonic()-t1)))
#

In [24]:
#
print("%s : Selecting entities to predict" % notebook)
t1 = monotonic()
entities, predict = selectEntitiesPredict(rdf_nb, kg) 
print("%s : Selecting entities to predict [Done %f s]" % (notebook,(monotonic()-t1)))
#
   

In [24]:
print("%s : Classifying entities" % notebook)
t1 = monotonic()
dn_annotated = classify(clf, predict, entities)
print("%s : Classifying entities [Done %fs]" % (notebook,(monotonic()-t1)))
#
#   

eda-feature-engineering-lgb-xgb-cat : Building KG
eda-feature-engineering-lgb-xgb-cat : Building KG [Done 18.127292 s]
eda-feature-engineering-lgb-xgb-cat : Loading notebook and add it to the KG
eda-feature-engineering-lgb-xgb-cat : Loading notebook and add it to the KG [Done 0.360544 s]
eda-feature-engineering-lgb-xgb-cat : Selecting entities to predict
eda-feature-engineering-lgb-xgb-cat : Selecting entities to predict [Done 28.743570 s]
eda-feature-engineering-lgb-xgb-cat : Classifying entities
eda-feature-engineering-lgb-xgb-cat : Classifying entities [Done 0.018312s]


In [None]:
print("%s : Compressing Data Journey" % notebook)
t1 = monotonic()
datajourney = generateDataJourney(dn_annotated, notebook)
print("%s : Compressing Data Journey [Done %fs]" % (notebook,(monotonic()-t1)))
#

In [None]:
print("%s : Build files" % notebook)
t1 = monotonic()
buildFiles(notebook, rdf_nb, datajourney)
print("%s : Build files [Done %fs]" % (notebook,(monotonic()-t1)))

eda-feature-engineering-lgb-xgb-cat : Compressing Data Journey
eda-feature-engineering-lgb-xgb-cat : Compressing Data Journey [Done 2.352714s]
eda-feature-engineering-lgb-xgb-cat : Build files


In [19]:
# TODO: change compression algorithm to include siebling nodes
# Support all the notebooks in one execution