**The below cell will install Spark in the google instance and initiate spark **

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-eu.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
!tar xf spark-2.4.0-bin-hadoop2.7.tgz
!pip install -q findspark
import findspark
findspark.init("/content/spark-2.4.0-bin-hadoop2.7")
import pyspark
sc= pyspark.SparkContext(appName ="RDF")

**We have selected an RDF that contain the data to implement all  of the four rules. We can use !wget to download the data from dropbox. The data is a study data in which most of the rules can be applied: https://github.com/Tellus/neo4j-rdf **

In [0]:
!wget -q https://www.dropbox.com/s/9v0bz9ywmiskg5w/Graph1.nt

**Here we have imported the data and you can check it in the below output cell.**

In [3]:
lines = sc.textFile('/content/Graph1.nt')
lines.collect()

['<http://example.org/unit> <http://www.w3.org/2000/01/rdf-schema#type> <http://www.w3.org/2000/01/rdf-schema#Class>',
 '<http://example.org/presentation-unit> <http://www.w3.org/2000/01/rdf-schema#subClassOf> <http://example.org/unit>',
 '<http://example.org/regular-unit> <http://www.w3.org/2000/01/rdf-schema#subClassOf> <http://example.org/unit>',
 '<http://example.org/name> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.w3.org/1999/02/22-rdf-syntax-ns#Property>',
 '<http://example.org/name> <http://www.w3.org/2000/01/rdf-schema#range> <http://www.w3.org/1999/02/22-rdf-syntax-ns#langString>',
 '<http://example.org/teacher> <http://www.w3.org/2000/01/rdf-schema#type> <http://www.w3.org/2000/01/rdf-schema#Class>',
 '<http://example.org/taughtBy> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.w3.org/1999/02/22-rdf-syntax-ns#Property>',
 '<http://example.org/taughtBy> <http://www.w3.org/2000/01/rdf-schema#domain> <http://example.org/unit>',
 '<http://example

**The below function aim at formating the data as (Key,Values) where the key for Rule 2 is the object(where the predicate is a schema type). The values are couples of ( (xxx,rdfs:domain),  (yyy, zzz))where the subject is the same as the prior object. **


---



*Assuming Rule 2:*
1.   aaa rdfs:domain xxx
2.   yyy aaa zzz

==> yyy rdf:type xxx

*The map output will be as follow:*

*   Key:aaa, Value: (xxx,rdfs:domain)
*   Key:aaa, Value: (yyy, zzz)


---
We then used the ReducebyKey to group the values by keys as shown in output of the cell


In [4]:
def parserule_2(line):
  fields = line.split()
  subject = fields[0]
  predicate = fields[1]
  obj = fields[2]
  if predicate== '<http://www.w3.org/2000/01/rdf-schema#domain>':
    return(subject,(obj, predicate))
  
  return (predicate,(obj, subject))

#### map the values using the parserule é function
rdd1_r2 = lines.map(parserule_2)
#### use the reduce by key to group elements
rdd2_r2 = rdd1_r2.reduceByKey(lambda x,y: x+y)
rdd2_r2.collect()

[('<http://www.w3.org/2000/01/rdf-schema#type>',
  ('<http://www.w3.org/2000/01/rdf-schema#Class>',
   '<http://example.org/unit>',
   '<http://www.w3.org/2000/01/rdf-schema#Class>',
   '<http://example.org/teacher>')),
 ('<http://www.w3.org/2000/01/rdf-schema#subClassOf>',
  ('<http://example.org/unit>',
   '<http://example.org/presentation-unit>',
   '<http://example.org/unit>',
   '<http://example.org/regular-unit>')),
 ('<http://www.w3.org/1999/02/22-rdf-syntax-ns#type>',
  ('<http://www.w3.org/1999/02/22-rdf-syntax-ns#Property>',
   '<http://example.org/name>',
   '<http://www.w3.org/1999/02/22-rdf-syntax-ns#Property>',
   '<http://example.org/taughtBy>',
   '<http://www.w3.org/1999/02/22-rdf-syntax-ns#Property>',
   '<http://example.org/organisedBy>',
   '<http://www.w3.org/1999/02/22-rdf-syntax-ns#Property>',
   '<http://example.org/avgMidtermGrade>',
   '<http://www.w3.org/1999/02/22-rdf-syntax-ns#Property>',
   '<http://example.org/avgFinalGrade>',
   '<http://www.w3.org/1999/

**The below function have been used with the mapvalues to apply rule 2 as the prior description. If a key value pair contains the schema domain the rule will be applied else the node data will be preserved.**


---


**A sample output is as follows while more details can be found in the below output cell **

 ('<http://example.org/avgMidtermGrade>',
  ['<http://example.org/semantic-web>   <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://example.org/unit>',  
   '<http://example.org/xml>  <http://www.w3.org/1999/02/22-rdf-syntax-ns#type>    <http://example.org/unit>'])

In [5]:
def rule2_domain(text):
  kp=[]
  RL_2=[]
  domain=False
  if len(text)<3:
    for line in text:
      kp.append(line)
  else:
    for line in text:
      if 'domain' in line:
        xxx=text[text.index(line)-1]
        RL_2=text[(text.index(line)+1):]
        Euroro=RL_2[1::2] #keeps only odd elements of the list
        for yyy in Euroro:
          kp.append('{} {} {}'.format(yyy,'<http://www.w3.org/1999/02/22-rdf-syntax-ns#type>',xxx))
        domain=True
    if domain==False:
      for line in text:
        kp.append(line)
  return kp

#below is the mapvalues using the rule_2 function
rdd4_2 = rdd2_r2.mapValues(rule2_domain)
rdd4_2.collect()

[('<http://www.w3.org/2000/01/rdf-schema#type>',
  ['<http://www.w3.org/2000/01/rdf-schema#Class>',
   '<http://example.org/unit>',
   '<http://www.w3.org/2000/01/rdf-schema#Class>',
   '<http://example.org/teacher>']),
 ('<http://www.w3.org/2000/01/rdf-schema#subClassOf>',
  ['<http://example.org/unit>',
   '<http://example.org/presentation-unit>',
   '<http://example.org/unit>',
   '<http://example.org/regular-unit>']),
 ('<http://www.w3.org/1999/02/22-rdf-syntax-ns#type>',
  ['<http://www.w3.org/1999/02/22-rdf-syntax-ns#Property>',
   '<http://example.org/name>',
   '<http://www.w3.org/1999/02/22-rdf-syntax-ns#Property>',
   '<http://example.org/taughtBy>',
   '<http://www.w3.org/1999/02/22-rdf-syntax-ns#Property>',
   '<http://example.org/organisedBy>',
   '<http://www.w3.org/1999/02/22-rdf-syntax-ns#Property>',
   '<http://example.org/avgMidtermGrade>',
   '<http://www.w3.org/1999/02/22-rdf-syntax-ns#Property>',
   '<http://example.org/avgFinalGrade>',
   '<http://www.w3.org/1999/

**Similar to the prior rule the implementation is composed by a data formating function, and a Rule function. The format functions aims at formating the data as (Key,Values) where the key for Rule 3 is the subject (where the predicate is a schema range). The values are couples of ( (xxx, rdfs:range), (zzz, yyy))where the subject is the same as the prior object. **


---



*Assuming Rule 3:*
1.   aaa rdfs:range xxx
2.   yyy aaa zzz

==> zzz rdf:type xxx

*The map output will be as follow:*

*   Key:aaa, Value: (xxx, rdfs:range)
*   Key:aaa, Value: (zzz, yyy)


---
The ReducebyKey have then been used to group the values by keys. You may use the collect to veiw each RDD created 


---

**The rule function have been used with the mapvalues to apply rule 3. If a key value pair contains the schema range the rule will be applied else the node data will be preserved.**


---


**A sample output is as follows while more details can be found in the below output cell **

 ('<http://example.org/avgMidtermGrade>',
  ['"13" <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.w3.org/2000/01/rdf-schema#literal>',
   '"16" <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.w3.org/2000/01/rdf-schema#literal>'])

In [6]:
#Line parse function for Rule 3
def parseRule_3(line):
  fields = line.split()
  subject = fields[0]
  predicate = fields[1]
  obj = fields[2]
  if predicate== '<http://www.w3.org/2000/01/rdf-schema#range>':
    return(subject,(obj, predicate))
  
  return (predicate,(obj, subject))

#The map function used to output the relevant key,value pairs 
rdd1_r3 = lines.map(parseRule_3)
#rdd1_r3.collect

#use the reduce by key to group elements
rdd2_r3 = rdd1_r3.reduceByKey(lambda x,y: x+y)
#rdd2_r3.collect()

#### function to apply Rule 3
def rule3_range(text):
  kp=[]
  RL_2=[]
  range=False
  if len(text)<3:
    for line in text:
      kp.append(line)
  else:
    for line in text:
      if 'range' in line:
        xxx=text[text.index(line)-1]
        RL_2=text[(text.index(line)+1):]
        Euroro=RL_2[0::2]
        for zzz in Euroro:
          kp.append('{} {} {}'.format(zzz,'<http://www.w3.org/1999/02/22-rdf-syntax-ns#type>',xxx))
        range=True
    if range==False:
      for line in text:
        kp.append(line)
  return kp

#below is the mapvalues using the rule_2 function
rdd4_3 = rdd2_r3.mapValues(rule3_range)
rdd4_3.collect()

[('<http://www.w3.org/2000/01/rdf-schema#type>',
  ['<http://www.w3.org/2000/01/rdf-schema#Class>',
   '<http://example.org/unit>',
   '<http://www.w3.org/2000/01/rdf-schema#Class>',
   '<http://example.org/teacher>']),
 ('<http://www.w3.org/2000/01/rdf-schema#subClassOf>',
  ['<http://example.org/unit>',
   '<http://example.org/presentation-unit>',
   '<http://example.org/unit>',
   '<http://example.org/regular-unit>']),
 ('<http://www.w3.org/1999/02/22-rdf-syntax-ns#type>',
  ['<http://www.w3.org/1999/02/22-rdf-syntax-ns#Property>',
   '<http://example.org/name>',
   '<http://www.w3.org/1999/02/22-rdf-syntax-ns#Property>',
   '<http://example.org/taughtBy>',
   '<http://www.w3.org/1999/02/22-rdf-syntax-ns#Property>',
   '<http://example.org/organisedBy>',
   '<http://www.w3.org/1999/02/22-rdf-syntax-ns#Property>',
   '<http://example.org/avgMidtermGrade>',
   '<http://www.w3.org/1999/02/22-rdf-syntax-ns#Property>',
   '<http://example.org/avgFinalGrade>',
   '<http://www.w3.org/1999/

**Similar to the prior rule the implementation is composed by a data formating function, and a Rule function. The format functions aims at formating the data as (Key,Values) where the key for Rule 7 is the subject (where the predicate is subPropertyOf). The values are couples of ( (bbb, rdfs:subpropertyof), (yyy, bbb))where the subject is the same as the prior object. **


---



*Assuming Rule 7:*
1.   aaa rdfs:subpropertyof bbb
2.   xxx aaa yyy

==> xxx bbb yyy

*The map output will be as follow:*

*   Key:aaa, Value: (bbb, rdfs:subpropertyof)
*   Key:aaa, Value: (yyy, bbb)


---
The ReducebyKey have then been used to group the values by keys. You may use the collect to veiw each RDD created 


---

**The rule function have been used with the mapvalues to apply rule 7. If a key value pair contains the rdfs subproporty, the rule will be applied else the node data will be preserved.**


---


**A sample output is as follows while more details can be found in the below output cell **

 ('<http://example.org/organisedBy>',
  ['<http://example.org/semantic-web> <http://example.org/taughtBy> <http://example.org/alice>',
   '<http://example.org/xml> <http://example.org/taughtBy> <http://example.org/carole>'])

In [7]:
#Line parse function for Rule 7
def parserule_7(line):
  fields = line.split()
  subject = fields[0]
  predicate = fields[1]
  obj = fields[2]
  if 'subPropertyOf' in predicate:
    return(subject,(obj, predicate))
  
  return (predicate,(obj, subject))

#The map function used to output the relevant key,value pairs
rdd1_r7 = lines.map(parserule_7)
#rdd1_r7.collect()

#use the reduce by key to group elements
rdd2_r7 = rdd1_r7.reduceByKey(lambda x,y: x+y)
#rdd2_r7.collect()

#### function to apply Rule 7
def rule7_Subpropoertyof(text):
  kp=[]
  RL=[]
  subPropertyOf=False
  if len(text)<3:
    for line in text:
      kp.append(line)
  else:
    for line in text:
      if 'subPropertyOf' in line:
        bbb=text[text.index(line)-1]
        RL_2=text[(text.index(line)+1):]
        for x, y in zip(*[iter(RL_2)] * 2):
          kp.append('{} {} {}'.format(y,bbb,x)) 
        subPropertyOf=True
    if subPropertyOf==False:
      for line in text:
        kp.append(line)
  return kp

#below is the mapvalues using the rule_7 function
rdd4_7 = rdd2_r7.mapValues(rule7_Subpropoertyof)
rdd4_7.collect()


[('<http://www.w3.org/2000/01/rdf-schema#type>',
  ['<http://www.w3.org/2000/01/rdf-schema#Class>',
   '<http://example.org/unit>',
   '<http://www.w3.org/2000/01/rdf-schema#Class>',
   '<http://example.org/teacher>']),
 ('<http://www.w3.org/2000/01/rdf-schema#subClassOf>',
  ['<http://example.org/unit>',
   '<http://example.org/presentation-unit>',
   '<http://example.org/unit>',
   '<http://example.org/regular-unit>']),
 ('<http://www.w3.org/1999/02/22-rdf-syntax-ns#type>',
  ['<http://www.w3.org/1999/02/22-rdf-syntax-ns#Property>',
   '<http://example.org/name>',
   '<http://www.w3.org/1999/02/22-rdf-syntax-ns#Property>',
   '<http://example.org/taughtBy>',
   '<http://www.w3.org/1999/02/22-rdf-syntax-ns#Property>',
   '<http://example.org/organisedBy>',
   '<http://www.w3.org/1999/02/22-rdf-syntax-ns#Property>',
   '<http://example.org/avgMidtermGrade>',
   '<http://www.w3.org/1999/02/22-rdf-syntax-ns#Property>',
   '<http://example.org/avgFinalGrade>',
   '<http://www.w3.org/1999/

**Similar to the prior rule the implementation is composed by a data formating function, and a Rule function. The format functions aims at formating the data as (Key,Values) where the key for Rule 9 is the subject (where the predicate is a rdf-syntax-ns#type). The values are couples of ( (yyy, rdfs:subclass), (xxx, df:type))where the subject is the same as the prior object. **


---



*Assuming Rule 9:*
1.   xxx rdfs:subclass yyy
2.   zzz rdf:type xxx

==> zzz rdf:type yyy

*The map output will be as follow:*

*   Key:xxx, Value: (yyy, rdfs:subclass)
*   Key:xxx, Value: (xxx, df:type)


---
The ReducebyKey have then been used to group the values by keys. You may use the collect to veiw each RDD created 


---

**The rule function have been used with the mapvalues to apply rule 9. If a key value pair contains the subclass, the rule will be applied else the node data will be preserved.**


---


**A sample output is as follows while more details can be found in the below output cell **

 ('<http://example.org/regular-unit>',
  ['<http://example.org/xml> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://example.org/unit>'])

In [8]:
def parseRule_9(line):
  fields = line.split()
  subject = fields[0]
  predicate = fields[1]
  obj = fields[2]
  if 'rdf-syntax-ns#type' in predicate:
    return(obj,(subject, predicate))
  return (subject,(obj, predicate))

#The map function used to output the relevant key,value pairs
rdd1 = lines.map(parseRule_9)
#rdd1.collect()

#use the reduce by key to group elements
rdd2 = rdd1.reduceByKey(lambda x,y: x+y)
#rdd2.collect()

#### function to apply Rule 9
def rule9_subclassof(text):
  kp=[]
  subclass=False
  slice_n=int((len(text)/2)-1)
  if len(text)<3:
    for line in text:
      kp.append(line)
  else:
    for line in text:
      if 'subClass' in line:
        y=text[text.index(line)-1]
        z=text[text.index(line)+slice_n]
        pred=text[text.index(line)+slice_n+1]
        kp.append('{} {} {}'.format(z,pred,y))
        subclass=True
    if subclass==False:
      for line in text:
        kp.append(line)
  return kp

#below is the mapvalues using the rule 9 function
rdd4 = rdd2.mapValues(rule9_subclassof)
rdd4.collect()

[('<http://example.org/unit>',
  ['<http://www.w3.org/2000/01/rdf-schema#Class>',
   '<http://www.w3.org/2000/01/rdf-schema#type>']),
 ('<http://example.org/presentation-unit>',
  ['<http://example.org/semantic-web> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://example.org/unit>']),
 ('<http://example.org/name>',
  ['<http://www.w3.org/1999/02/22-rdf-syntax-ns#langString>',
   '<http://www.w3.org/2000/01/rdf-schema#range>']),
 ('<http://example.org/taughtBy>',
  ['<http://example.org/unit>',
   '<http://www.w3.org/2000/01/rdf-schema#domain>',
   '<http://example.org/teacher>',
   '<http://www.w3.org/2000/01/rdf-schema#range>']),
 ('<http://example.org/avgMidtermGrade>',
  ['<http://example.org/unit>',
   '<http://www.w3.org/2000/01/rdf-schema#domain>',
   '<http://www.w3.org/2000/01/rdf-schema#literal>',
   '<http://www.w3.org/2000/01/rdf-schema#range>']),
 ('<http://example.org/alice>', ['"Alice"', '<http://example.org/name>']),
 ('<http://example.org/regular-unit>',
  ['<