In [6]:
pip install rdflib neo4j



In [3]:
import rdflib, time

In [4]:
#utility function to get the local part of a URI (stripping out the namespace)

def getLocalPart(uri):
  pos = -1
  pos = uri.rfind('#')
  if pos < 0 :
    pos = uri.rfind('/')
  if pos < 0 :
    pos = uri.rindex(':')
  return uri[pos+1:]

def getNamespacePart(uri):
  pos = -1
  pos = uri.rfind('#')
  if pos < 0 :
    pos = uri.rfind('/')
  if pos < 0 :
    pos = uri.rindex(':')
  return uri[0:pos+1]

# quick test
print(getLocalPart("http://onto.neo4j.com/rail#Station"))
print(getNamespacePart("http://onto.neo4j.com/rail#Station"))

Station
http://onto.neo4j.com/rail#


In [5]:
# processing the ontology...

g = rdflib.Graph()
g.parse("/content/rail.ttl", format='turtle')

simple_query = """
prefix owl: <http://www.w3.org/2002/07/owl#>
prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>

SELECT DISTINCT ?c
  WHERE {
    ?c rdf:type owl:Class .
  } """

for row in g.query(simple_query):
    print(str(row.c), getLocalPart(str(row.c)), getNamespacePart(str(row.c)))


http://onto.neo4j.com/rail#Event Event http://onto.neo4j.com/rail#
http://onto.neo4j.com/rail#Station Station http://onto.neo4j.com/rail#


In [7]:
# read the onto and generate cypher (complete without mappings)

g = rdflib.Graph()
g.parse("/content/rail.ttl", format='turtle')

classes_and_props_query = """
prefix owl: <http://www.w3.org/2002/07/owl#>
prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>

SELECT DISTINCT ?curi (GROUP_CONCAT(DISTINCT ?propTypePair ; SEPARATOR=",") AS ?props)
WHERE {
    ?curi rdf:type owl:Class .
    optional {
      ?prop rdfs:domain ?curi ;
        a owl:DatatypeProperty ;
        rdfs:range ?range .
      BIND (concat(str(?prop),';',str(?range)) AS ?propTypePair)
    }
  } GROUP BY ?curi  """

cypher_list = []

for row in g.query(classes_and_props_query):
    cypher = []
    cypher.append("unwind $records AS record")
    cypher.append("merge (n:" + getLocalPart(row.curi) + " { `<id_prop>`: record.`<col with id>`} )")
    for pair in row.props.split(","):
      propName = pair.split(";")[0]
      propType = pair.split(";")[1]
      cypher.append("set n." + getLocalPart(propName) + " = record.`<col with value for " + getLocalPart(propName) + ">`")
    cypher.append("return count(*) as total")
    cypher_list.append(' \n'.join(cypher))


rels_query = """
prefix owl: <http://www.w3.org/2002/07/owl#>
prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>

SELECT DISTINCT ?rel ?dom ?ran #(GROUP_CONCAT(DISTINCT ?relTriplet ; SEPARATOR=",") AS ?rels)
WHERE {
    ?rel a ?propertyClass .
    filter(?propertyClass in (rdf:Property, owl:ObjectProperty, owl:FunctionalProperty, owl:AsymmetricProperty,
           owl:InverseFunctionalProperty, owl:IrreflexiveProperty, owl:ReflexiveProperty, owl:SymmetricProperty, owl:TransitiveProperty))

    ?rel rdfs:domain ?dom ;
      rdfs:range ?ran .

    #BIND (concat(str(?rel),';',str(?dom),';',str(?range)) AS ?relTriplet)

  }"""


for row in g.query(rels_query):
  cypher = []
  cypher.append("unwind $records AS record")
  cypher.append("match (source:" + getLocalPart(row.dom) + " { `<id_prop>`: record.`<col with source id>`} )")
  cypher.append("match (target:" + getLocalPart(row.ran) + " { `<id_prop>`: record.`<col with target id>`} )")
  cypher.append("merge (source)-[r:`"+ getLocalPart(row.rel) +"`]->(target)")
  cypher.append("return count(*) as total")
  cypher_list.append(' \n'.join(cypher))

for q in cypher_list:
  print("\n\n" + q)




unwind $records AS record 
merge (n:Event { `<id_prop>`: record.`<col with id>`} ) 
set n.eventDescription = record.`<col with value for eventDescription>` 
set n.eventId = record.`<col with value for eventId>` 
set n.eventType = record.`<col with value for eventType>` 
return count(*) as total


unwind $records AS record 
merge (n:Station { `<id_prop>`: record.`<col with id>`} ) 
set n.lat = record.`<col with value for lat>` 
set n.long = record.`<col with value for long>` 
set n.stationAddress = record.`<col with value for stationAddress>` 
set n.stationCode = record.`<col with value for stationCode>` 
set n.stationName = record.`<col with value for stationName>` 
return count(*) as total


unwind $records AS record 
match (source:Event { `<id_prop>`: record.`<col with source id>`} ) 
match (target:Station { `<id_prop>`: record.`<col with target id>`} ) 
merge (source)-[r:`affects`]->(target) 
return count(*) as total


unwind $records AS record 
match (source:Station { `<id_prop>`

In [9]:
railMappings = {}

stationMapping = {}
stationMapping["@fileName"] = "/content/nr-stations-all.csv"
stationMapping["@uniqueId"] = "stationCode"
stationMapping["lat"] = "lat"
stationMapping["long"] = "long"
stationMapping["stationAddress"] = "address"
stationMapping["stationCode"] = "crs"
stationMapping["stationName"] = "name"
railMappings["Station"] = stationMapping

eventMapping = {}
eventMapping["@fileName"] = "/content/nr-events.csv"
eventMapping["@uniqueId"] = "eventId"
eventMapping["eventDescription"] = "desc"
eventMapping["eventId"] = "id"
eventMapping["timestamp"] = "ts"
eventMapping["eventType"] = "type"
railMappings["Event"] = eventMapping

linkMapping = {}
linkMapping["@fileName"] = "/content/nr-station-links.csv"
linkMapping["@from"] = "origin"
linkMapping["@to"] = "destination"
railMappings["link"] = linkMapping

affectsMapping = {}
affectsMapping["@fileName"] = "/content/nr-events.csv"
affectsMapping["@from"] = "id"
affectsMapping["@to"] = "Station"
railMappings["affects"] = affectsMapping

# show it?
railMappings

{'Station': {'@fileName': '/content/nr-stations-all.csv',
  '@uniqueId': 'stationCode',
  'lat': 'lat',
  'long': 'long',
  'stationAddress': 'address',
  'stationCode': 'crs',
  'stationName': 'name'},
 'Event': {'@fileName': '/content/nr-events.csv',
  '@uniqueId': 'eventId',
  'eventDescription': 'desc',
  'eventId': 'id',
  'timestamp': 'ts',
  'eventType': 'type'},
 'link': {'@fileName': '/content/nr-station-links.csv',
  '@from': 'origin',
  '@to': 'destination'},
 'affects': {'@fileName': '/content/nr-events.csv',
  '@from': 'id',
  '@to': 'Station'}}

In [10]:
#copy of previous but using the mappings
def getLoadersFromOnto(onto, rdf_format, mappings):
  g = rdflib.Graph()
  g.parse(onto, format= rdf_format)

  classes_and_props_query = """
  prefix owl: <http://www.w3.org/2002/07/owl#>
  prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>

  SELECT DISTINCT ?curi (GROUP_CONCAT(DISTINCT ?propTypePair ; SEPARATOR=",") AS ?props)
  WHERE {
      ?curi rdf:type owl:Class .
      optional {
        ?prop rdfs:domain ?curi ;
          a owl:DatatypeProperty ;
          rdfs:range ?range .
        BIND (concat(str(?prop),';',str(?range)) AS ?propTypePair)
      }
    } GROUP BY ?curi  """

  cypher_import = {}
  export_ns = set()
  export_mappings = {}

  for row in g.query(classes_and_props_query):
      export_ns.add(getNamespacePart(row.curi))
      export_mappings[getLocalPart(row.curi)] = str(row.curi)
      cypher = []
      cypher.append("unwind $records AS record")
      cypher.append("merge (n:" + getLocalPart(row.curi) + " { `" + mappings[getLocalPart(row.curi)]["@uniqueId"] + "`: record.`" + mappings[getLocalPart(row.curi)][mappings[getLocalPart(row.curi)]["@uniqueId"]] + "`} )")
      for pair in row.props.split(","):
        propName = pair.split(";")[0]
        propType = pair.split(";")[1]
        export_ns.add(getNamespacePart(propName))
        export_mappings[getLocalPart(propName)] = propName
        #if a mapping (a column in the source file) is defined for the property and property is not a unique id
        if getLocalPart(propName) in mappings[getLocalPart(row.curi)] and getLocalPart(propName) != mappings[getLocalPart(row.curi)]["@uniqueId"]:
          cypher.append("set n." + getLocalPart(propName) + " = record.`" + mappings[getLocalPart(row.curi)][getLocalPart(propName)] + "`")
      cypher.append("return count(*) as total")
      cypher_import[getLocalPart(row.curi)] = ' \n'.join(cypher)


  rels_query = """
  prefix owl: <http://www.w3.org/2002/07/owl#>
  prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>

  SELECT DISTINCT ?rel ?dom ?ran #(GROUP_CONCAT(DISTINCT ?relTriplet ; SEPARATOR=",") AS ?rels)
  WHERE {
      ?rel a ?propertyClass .
      filter(?propertyClass in (rdf:Property, owl:ObjectProperty, owl:FunctionalProperty, owl:AsymmetricProperty,
            owl:InverseFunctionalProperty, owl:IrreflexiveProperty, owl:ReflexiveProperty, owl:SymmetricProperty, owl:TransitiveProperty))

      ?rel rdfs:domain ?dom ;
        rdfs:range ?ran .

      #BIND (concat(str(?rel),';',str(?dom),';',str(?range)) AS ?relTriplet)

    }"""

  for row in g.query(rels_query):
    export_ns.add(getNamespacePart(row.rel))
    export_mappings[getLocalPart(row.rel)] = str(row.rel)
    cypher = []
    cypher.append("unwind $records AS record")
    cypher.append("match (source:" + getLocalPart(row.dom) + " { `" + mappings[getLocalPart(row.dom)]["@uniqueId"] + "`: record.`" + mappings[getLocalPart(row.rel)]["@from"] + "`} )")
    cypher.append("match (target:" + getLocalPart(row.ran) + " { `" + mappings[getLocalPart(row.ran)]["@uniqueId"] + "`: record.`" + mappings[getLocalPart(row.rel)]["@to"] + "`} )")
    cypher.append("merge (source)-[r:`"+ getLocalPart(row.rel) +"`]->(target)")
    cypher.append("return count(*) as total")
    cypher_import[getLocalPart(row.rel)] = ' \n'.join(cypher)


  nscount = 0
  mapping_export_cypher = []

  for ns in export_ns:
    mapping_export_cypher.append("call n10s.nsprefixes.add('ns" + str(nscount) + "','" + ns + "');")
    nscount+=1

  for k in export_mappings.keys():
    mapping_export_cypher.append("call n10s.mapping.add('" + export_mappings[k] + "','" + k + "');")

  return cypher_import ,  mapping_export_cypher




In [12]:
cypher_import , mapping_defs = getLoadersFromOnto("/content/rail.ttl","turtle",railMappings)

print("#LOADERS:\n\n")
for q in cypher_import.keys():
  print(q + ": \n\nfile: " + railMappings[q]["@fileName"] + "\n\n"+ cypher_import[q] + "\n\n")

print("#EXPORT MAPPINGS (for RDF API):\n\n")
for md in mapping_defs:
  print(md)

#LOADERS:


Event: 

file: /content/nr-events.csv

unwind $records AS record 
merge (n:Event { `eventId`: record.`id`} ) 
set n.eventDescription = record.`desc` 
set n.eventType = record.`type` 
return count(*) as total


Station: 

file: /content/nr-stations-all.csv

unwind $records AS record 
merge (n:Station { `stationCode`: record.`crs`} ) 
set n.lat = record.`lat` 
set n.long = record.`long` 
set n.stationAddress = record.`address` 
set n.stationName = record.`name` 
return count(*) as total


affects: 

file: /content/nr-events.csv

unwind $records AS record 
match (source:Event { `eventId`: record.`id`} ) 
match (target:Station { `stationCode`: record.`Station`} ) 
merge (source)-[r:`affects`]->(target) 
return count(*) as total


link: 

file: /content/nr-station-links.csv

unwind $records AS record 
match (source:Station { `stationCode`: record.`origin`} ) 
match (target:Station { `stationCode`: record.`destination`} ) 
merge (source)-[r:`link`]->(target) 
return count(*) as t

In [13]:
# Utility function to write to Neo4j in batch mode.

def insert_data(session, query, frame, batch_size = 500):

    total = 0
    batch = 0
    start = time.time()
    result = None

    while batch * batch_size < len(frame):
        res = session.write_transaction( lambda tx: tx.run(query,
                      parameters = {'records': frame[batch*batch_size:(batch+1)*batch_size].to_dict('records')}).data())

        total += res[0]['total']
        batch += 1
        result = {"total":total,
                  "batches":batch,
                  "time":time.time()-start}
        print(result)

    return result

In [19]:
import pandas as pd
from neo4j import GraphDatabase, basic_auth

driver = GraphDatabase.driver(
  "bolt://44.200.195.16:7687",
 auth=basic_auth("neo4j", "blinks-month-suppressions"))

session = driver.session(database="neo4j")

cypher_import , mapping_defs = getLoadersFromOnto("/content/rail.ttl","turtle",railMappings)

for q in cypher_import.keys():
  print("about to import " + q + " from file " + railMappings[q]["@fileName"])
  df = pd.read_csv(railMappings[q]["@fileName"])
  result = insert_data(session, cypher_import[q], df, batch_size = 300)
  print(result)

for md in mapping_defs:
  session.run(md)


about to import Event from file /content/nr-events.csv


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 143, 'batches': 1, 'time': 2.6955325603485107}
{'total': 143, 'batches': 1, 'time': 2.6955325603485107}
about to import Station from file /content/nr-stations-all.csv


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 300, 'batches': 1, 'time': 3.1851143836975098}


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 600, 'batches': 2, 'time': 5.480154991149902}


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 900, 'batches': 3, 'time': 7.080058813095093}


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 1200, 'batches': 4, 'time': 8.383249044418335}


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 1500, 'batches': 5, 'time': 9.792808055877686}


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 1800, 'batches': 6, 'time': 11.190566539764404}


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 2100, 'batches': 7, 'time': 13.998529195785522}


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 2400, 'batches': 8, 'time': 16.19254469871521}


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 2593, 'batches': 9, 'time': 17.564476013183594}
{'total': 2593, 'batches': 9, 'time': 17.564476013183594}
about to import affects from file /content/nr-events.csv


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 143, 'batches': 1, 'time': 3.937549114227295}
{'total': 143, 'batches': 1, 'time': 3.937549114227295}
about to import link from file /content/nr-station-links.csv


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 300, 'batches': 1, 'time': 5.275152921676636}


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 600, 'batches': 2, 'time': 7.965444326400757}


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 898, 'batches': 3, 'time': 10.742384672164917}


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 1198, 'batches': 4, 'time': 12.865227699279785}


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 1498, 'batches': 5, 'time': 14.865331411361694}


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 1796, 'batches': 6, 'time': 17.780264377593994}


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 2095, 'batches': 7, 'time': 22.18072509765625}


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 2394, 'batches': 8, 'time': 26.18054962158203}


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 2694, 'batches': 9, 'time': 28.979995250701904}


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 2994, 'batches': 10, 'time': 31.279184341430664}


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 3292, 'batches': 11, 'time': 34.75636601448059}


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 3592, 'batches': 12, 'time': 37.20470881462097}


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 3892, 'batches': 13, 'time': 39.272268295288086}


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 4191, 'batches': 14, 'time': 41.57803726196289}


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 4490, 'batches': 15, 'time': 43.77136516571045}


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 4785, 'batches': 16, 'time': 45.97575497627258}


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 5082, 'batches': 17, 'time': 49.57385039329529}


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 5381, 'batches': 18, 'time': 52.54588341712952}


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 5681, 'batches': 19, 'time': 54.440361738204956}


  res = session.write_transaction( lambda tx: tx.run(query,


{'total': 5782, 'batches': 20, 'time': 55.20778250694275}
{'total': 5782, 'batches': 20, 'time': 55.20778250694275}


In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive
