In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, HiveContext
from pyspark.sql.functions import *

In [2]:
!pip install PySPARQL==0.0.5

Collecting PySPARQL==0.0.5
  Downloading PySPARQL-0.0.5-py3-none-any.whl (6.0 kB)
Collecting graphframes
  Downloading graphframes-0.6-py2.py3-none-any.whl (18 kB)
Collecting nose
  Downloading nose-1.3.7-py3-none-any.whl (154 kB)
[K     |████████████████████████████████| 154 kB 3.9 MB/s eta 0:00:01
[?25hCollecting pyspark
  Downloading pyspark-3.0.1.tar.gz (204.2 MB)
[K     |████████████████████████████████| 204.2 MB 61 kB/s s eta 0:00:01     |███████████████████████▊        | 151.6 MB 10.7 MB/s eta 0:00:05
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 10.6 MB/s eta 0:00:01
[?25hCollecting rdflib
  Downloading rdflib-5.0.0-py3-none-any.whl (231 kB)
[K     |████████████████████████████████| 231 kB 10.1 MB/s eta 0:00:01
Collecting isodate
  Downloading isodate-0.6.0-py2.py3-none-any.whl (45 kB)
[K     |████████████████████████████████| 45 kB 2.6 MB/s  eta 0:00:01
Collecting rdflib-jsonld
  Do

In [3]:
from PySPARQL.Wrapper import PySPARQLWrapper

### Connection to Apache Spark

In [4]:
spark = (SparkSession
          .builder
          .appName("interfacing spark sql to hive metastore without configuration file")
          .config("hive.metastore.uris", "thrift://hive-metastore:9083") 
          .enableHiveSupport() 
          .getOrCreate())

### SPARQL query execution

In [5]:
sparql_endpoint = "http://jena-fuseki:3030/pizzads"

query = """
    PREFIX : <http://www.co-ode.org/ontologies/pizza/pizza.owl#>

    SELECT ?pizzaID ?outcome
    WHERE {

      ?pizzaType :suggestedTempLow ?tempLow; :suggestedDurationLow ?durLow;
                 :suggestedTempUp ?tempUp; :suggestedDurationUp ?durUp .

      SERVICE <http://ontop:8080/sparql> {
        ?pizzaID a ?pizzaType.
        ?pizzaID :temperature ?avgTemp; :start_cooking ?start; :end_cooking ?end.
      }

      BIND ((?end-?start) AS ?cookDuration)

      BIND( IF ((?avgTemp >= ?tempLow && ?avgTemp <= ?tempUp) &&
          (?cookDuration >= ?durLow && ?cookDuration <= ?durUp)
          ,"WELL_COOKED","ANOMALY") AS ?outcome)
    }
"""

wrapper = PySPARQLWrapper(spark, sparql_endpoint)
result = wrapper.query(query)
resultDF = result.dataFrame

pizzaID,outcome
http://www.co-ode.org/ontologies/pizza/pizza.owl#PZ789,ANOMALY
http://www.co-ode.org/ontologies/pizza/pizza.owl#PZ333,WELL_COOKED
http://www.co-ode.org/ontologies/pizza/pizza.owl#PZ222,WELL_COOKED
http://www.co-ode.org/ontologies/pizza/pizza.owl#PZ005,WELL_COOKED
http://www.co-ode.org/ontologies/pizza/pizza.owl#PZ456,WELL_COOKED
http://www.co-ode.org/ontologies/pizza/pizza.owl#PZ001,ANOMALY
http://www.co-ode.org/ontologies/pizza/pizza.owl#PZ789,ANOMALY
http://www.co-ode.org/ontologies/pizza/pizza.owl#PZ444,ANOMALY
http://www.co-ode.org/ontologies/pizza/pizza.owl#PZ333,WELL_COOKED
http://www.co-ode.org/ontologies/pizza/pizza.owl#PZ999,WELL_COOKED
http://www.co-ode.org/ontologies/pizza/pizza.owl#PZ111,WELL_COOKED
http://www.co-ode.org/ontologies/pizza/pizza.owl#PZ003,WELL_COOKED
http://www.co-ode.org/ontologies/pizza/pizza.owl#PZ004,ANOMALY
http://www.co-ode.org/ontologies/pizza/pizza.owl#PZ002,WELL_COOKED
http://www.co-ode.org/ontologies/pizza/pizza.owl#PZ

In [6]:
resultDF.show()  # Plot the results

+--------------------+-----------+
|             pizzaID|    outcome|
+--------------------+-----------+
|http://www.co-ode...|    ANOMALY|
|http://www.co-ode...|WELL_COOKED|
|http://www.co-ode...|WELL_COOKED|
|http://www.co-ode...|WELL_COOKED|
|http://www.co-ode...|WELL_COOKED|
|http://www.co-ode...|    ANOMALY|
|http://www.co-ode...|    ANOMALY|
|http://www.co-ode...|    ANOMALY|
|http://www.co-ode...|WELL_COOKED|
|http://www.co-ode...|WELL_COOKED|
|http://www.co-ode...|WELL_COOKED|
|http://www.co-ode...|WELL_COOKED|
|http://www.co-ode...|    ANOMALY|
|http://www.co-ode...|WELL_COOKED|
|http://www.co-ode...|WELL_COOKED|
|http://www.co-ode...|WELL_COOKED|
|http://www.co-ode...|WELL_COOKED|
|http://www.co-ode...|    ANOMALY|
|http://www.co-ode...|WELL_COOKED|
|http://www.co-ode...|    ANOMALY|
+--------------------+-----------+



In [7]:
# clean the result by removing the prefixes

df2 = resultDF.withColumn("pizzaID", regexp_replace('pizzaID','http://www.co-ode.org/ontologies/pizza/pizza.owl#',''))

In [8]:
df2.show()   # Plot the cleaned results

+-------+-----------+
|pizzaID|    outcome|
+-------+-----------+
|  PZ789|    ANOMALY|
|  PZ333|WELL_COOKED|
|  PZ222|WELL_COOKED|
|  PZ005|WELL_COOKED|
|  PZ456|WELL_COOKED|
|  PZ001|    ANOMALY|
|  PZ789|    ANOMALY|
|  PZ444|    ANOMALY|
|  PZ333|WELL_COOKED|
|  PZ999|WELL_COOKED|
|  PZ111|WELL_COOKED|
|  PZ003|WELL_COOKED|
|  PZ004|    ANOMALY|
|  PZ002|WELL_COOKED|
|  PZ888|WELL_COOKED|
|  PZ666|WELL_COOKED|
|  PZ777|WELL_COOKED|
|  PZ123|    ANOMALY|
|  PZ555|WELL_COOKED|
|  PZ222|    ANOMALY|
+-------+-----------+



### Persists the Spark Dataframe into a Spark table

In [9]:
df2.write.mode("overwrite").saveAsTable('pizzadb.analysis')