# LINEAR PIZZA OVEN CHALLENGE


A solution by Francesco Invernici 

Unstructured and Streaming Data Engineering, POLIMI, 2020

---

### Sources:
- S1: Camera that recognizes the topping [ws://streams:9400](ws://streams:9400)
- S2: Oven thermometer [ws://streams:9500](ws://streams:9500)

Import the necessary rsplib methods

In [1]:
from rsplib import RSPEngine, RSPPublisher, Stream, rdf_table,load_graph, accessURL

## Publish the streams generated by the two sensors

Connect to the publishing service

In [2]:
streamhub = RSPPublisher("http://streamhub:9292/streamhub")

Create a publishing query for each source stream and publish it to streamhub

In [3]:
vocals1 = '''
BASE  <http://streamhub:9292/streamhub>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX dcat: <http://www.w3.org/ns/dcat#>
PREFIX frmt: <http://www.w3.org/ns/formats/>
PREFIX vocals: <http://w3id.org/rsp/vocals#>
PREFIX vsd: <http://w3id.org/rsp/vocals-sd#>
REGISTER STREAM :pizzastream
FROM SOURCE <ws://streams:9400>
WHERE {

           {this} a vocals:StreamDescriptor .

           {publisher} a vsd:PublishingService ;
                         vsd:hasFeature vsd:replaying ;
                         vsd:resultFormat frmt:JSON-LD .

           :PizzaEndpoint a vocals:StreamEndpoint ;
                            dcat:title "A Pizza Stream Endpoint"^^xsd:string ;
                            dcat:description "Streaming endpoint to consume pizzas via WebSocket"^^xsd:string ;
                            dcat:format frmt:JSON-LD ;
                            dcat:accessURL {source} ;
                            vsd:publishedBy {publisher} .

           {stream} a vocals:RDFStream ;
                    dcat:title "Pizza Stream"^^xsd:string ;
                    dcat:description "Stream of pizzas"^^xsd:string ;
                    dcat:publisher {publisher} ;
                    dcat:landingPage <https://example.org/rw/pizzas/> ;
                    vocals:hasEndpoint :PizzaEndpoint  .

}'''

In [4]:
streamhub.publish(vocals1)

{
    "@id": "streams/pizzastream",
    "@type": "vocals:StreamDescriptor",
    "dcat:dataset": {
        "@id": "streams/pizzastream",
        "@type": "vocals:RDFStream",
        "dcat:description": "Stream of pizzas",
        "dcat:landingPage": "https://example.org/rw/pizzas/",
        "dcat:publisher": "http://streamhub:9292/streamhub",
        "dcat:title": "Pizza Stream",
        "vocals:hasEndpoint": [
            [
                {
                    "@id": "http://streamhub:9292/streamhub/PizzaEndpoint",
                    "@type": "http://w3id.org/rsp/vocals#StreamEndpoint",
                    "dcat:title": "A Pizza Stream Endpoint",
                    "dcat:description": "Streaming endpoint to consume pizzas via WebSocket",
                    "dcat:format": "http://www.w3.org/ns/formats/JSON-LD",
                    "dcat:accessURL": "ws://streams:9400",
                    "vsd:publishedBy": "http://streamhub:9292/streamhub"
                }
            ],
         

In [5]:
vocals2 = '''
BASE  <http://streamhub:9292/streamhub>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX dcat: <http://www.w3.org/ns/dcat#>
PREFIX frmt: <http://www.w3.org/ns/formats/>
PREFIX vocals: <http://w3id.org/rsp/vocals#>
PREFIX vsd: <http://w3id.org/rsp/vocals-sd#>
REGISTER STREAM :temperaturestream
FROM SOURCE <ws://streams:9500>
WHERE {

           {this} a vocals:StreamDescriptor .

           {publisher} a vsd:PublishingService ;
                         vsd:hasFeature vsd:replaying ;
                         vsd:resultFormat frmt:JSON-LD .

           :TemperatureEndpoint a vocals:StreamEndpoint ;
                            dcat:title "A Temperature Stream Endpoint"^^xsd:string ;
                            dcat:description "Streaming endpoint to consume oven temperatures via WebSocket"^^xsd:string ;
                            dcat:format frmt:JSON-LD ;
                            dcat:accessURL {source} ;
                            vsd:publishedBy {publisher} .

           {stream} a vocals:RDFStream ;
                    dcat:title "Temperature Stream"^^xsd:string ;
                    dcat:description "Stream of oven temperatures"^^xsd:string ;
                    dcat:publisher {publisher} ;
                    dcat:landingPage <https://example.org/rw/temperatures/> ;
                    vocals:hasEndpoint :TemperatureEndpoint  .

}'''

In [6]:
streamhub.publish(vocals2)

{
    "@id": "streams/temperaturestream",
    "@type": "vocals:StreamDescriptor",
    "dcat:dataset": {
        "@id": "streams/temperaturestream",
        "@type": "vocals:RDFStream",
        "dcat:description": "Stream of oven temperatures",
        "dcat:landingPage": "https://example.org/rw/temperatures/",
        "dcat:publisher": "http://streamhub:9292/streamhub",
        "dcat:title": "Temperature Stream",
        "vocals:hasEndpoint": [
            [
                {
                    "@id": "http://streamhub:9292/streamhub/TemperatureEndpoint",
                    "@type": "http://w3id.org/rsp/vocals#StreamEndpoint",
                    "dcat:title": "A Temperature Stream Endpoint",
                    "dcat:description": "Streaming endpoint to consume oven temperatures via WebSocket",
                    "dcat:format": "http://www.w3.org/ns/formats/JSON-LD",
                    "dcat:accessURL": "ws://streams:9500",
                    "vsd:publishedBy": "http://streamhub:

In [7]:
streamhub.lists()

[http://streamhub:9292/streamhub/streams/temperaturestream,
 http://streamhub:9292/streamhub/streams/pizzastream]

#### Use this to delete the streams before closing...

In [22]:
streamhub.delete('pizzastream')
streamhub.delete('temperaturestream')
streamhub.lists()

[]


---

## IT'S QUERY TIME

### Initialize the RSPEngine

In [8]:
jasper = RSPEngine("http://jasper:8181/jasper")

Check for streams and tasks already available

In [9]:
jasper.streams()

[]

In [10]:
jasper.tasks()

[]

## Define the query

### Get the knowledge graph with Ontop from Carl's database
- Connect to the ontop container via SSH
- Run   
`./ontop materialize --format rdfxml -m input/cooking-mapping.ttl -p input/cooking.properties -o input/cooking-knowledge-graph.rdf`  
to export the graph built from Carl's relational database to an rdf file (Available at [http://localhost:8080/files/cooking-knowledge-graph.rdf](http://localhost:8080/files/cooking-knowledge-graph.rdf))

#### Generated RDF file `cooking-knowledge-graph.rdf` below

In [11]:
graph_file = open('./cooking-knowledge-graph.rdf', 'r')
text = graph_file.read()
print(text)

<?xml version="1.0" encoding="UTF-8"?>
<rdf:RDF
	xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#">

<rdf:Description rdf:about="http://www.co-ode.org/ontologies/pizza/pizza.owl#Margherita">
	<tempAvg xmlns="http://linkeddata.stream/ontologies/cooking#" rdf:datatype="http://www.w3.org/2001/XMLSchema#decimal">280</tempAvg>
</rdf:Description>

<rdf:Description rdf:about="http://www.co-ode.org/ontologies/pizza/pizza.owl#American">
	<tempAvg xmlns="http://linkeddata.stream/ontologies/cooking#" rdf:datatype="http://www.w3.org/2001/XMLSchema#decimal">270</tempAvg>
</rdf:Description>

<rdf:Description rdf:about="http://www.co-ode.org/ontologies/pizza/pizza.owl#Margherita">
	<tempStd xmlns="http://linkeddata.stream/ontologies/cooking#" rdf:datatype="http://www.w3.org/2001/XMLSchema#decimal">10</tempStd>
</rdf:Description>

<rdf:Description rdf:about="http://www.co-ode.org/ontologies/pizza/pizza.owl#American">
	<tempStd xmlns="http://linkeddata.stream/ontologies/cooking#" rdf:datatype="ht

### Check the Pizza Ontology
Check the [Pizza Ontology](https://protege.stanford.edu/ontologies/pizza/pizza.owl) to write the query

PREFIX is not supported, I have to use it statically...

#### List of useful PREFIXs I'll need:
- PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
- PREFIX pizza: <http://www.co-ode.org/ontologies/pizza/pizza.owl#>
- PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
- PREFIX owl: <http://www.w3.org/2002/07/owl#>
- PREFIX sosa: http://www.w3.org/ns/sosa/
- http://localhost:8080/files/cooking-knowledge-graph.rdf
- (Temp source) http://streamhub:9292/streamhub/streams/temperaturestream
- (Pizza source) http://streamhub:9292/streamhub/streams/pizzastream

### Query parameters

In [12]:
qid = "cooked"
tbox = "https://protege.stanford.edu/ontologies/pizza/pizza.owl"
frmt = "JSON"
body = '''
SELECT ?margherita ?american ?valueAmerican ?valueMargherita
FROM NAMED WINDOW <pw1> ON <http://streamhub:9292/streamhub/streams/pizzastream> [RANGE PT10S STEP PT5S]
FROM NAMED WINDOW <pw2> ON <http://streamhub:9292/streamhub/streams/pizzastream> [RANGE PT10S STEP PT5S]
FROM NAMED WINDOW <tw1> ON <http://streamhub:9292/streamhub/streams/temperaturestream> [RANGE PT10S STEP PT5S]
FROM NAMED WINDOW <tw2> ON <http://streamhub:9292/streamhub/streams/temperaturestream> [RANGE PT10S STEP PT5S]
FROM NAMED <http://notebook:8888/files/cooking-knowledge-graph.rdf#>
WHERE {
    {WINDOW <pw1> {
        ?margherita a <http://www.co-ode.org/ontologies/pizza/pizza.owl#Pizza>;
             <http://www.co-ode.org/ontologies/pizza/pizza.owl#hasTopping>  <http://www.co-ode.org/ontologies/pizza/pizza.owl#TomatoTopping> ;
             <http://www.co-ode.org/ontologies/pizza/pizza.owl#hasTopping>  <http://www.co-ode.org/ontologies/pizza/pizza.owl#MozzarellaTopping> .
         FILTER NOT EXISTS {
                        ?margherita <http://www.co-ode.org/ontologies/pizza/pizza.owl#hasTopping> <http://www.co-ode.org/ontologies/pizza/pizza.owl#PeperoniSausageTopping>.
                        }
        }
    WINDOW <tw2> {
        ?id2 a <http://www.w3.org/ns/sosa/Observation>;
        <http://www.w3.org/ns/sosa/hasFeatureOfInterest> ?pizza2;
        <http://www.w3.org/ns/sosa/hasResult> ?res2.
        ?res2 <http://qudt.org/1.1/schema/qudt#numericValue> ?valueMargherita.
        }
        FILTER( 260 < ?valueMargherita && ?valueMargherita < 300 && ?margherita = ?pizza2)
        }
    UNION
    {WINDOW <pw2> {
        ?american a <http://www.co-ode.org/ontologies/pizza/pizza.owl#Pizza>;
             <http://www.co-ode.org/ontologies/pizza/pizza.owl#hasTopping>  <http://www.co-ode.org/ontologies/pizza/pizza.owl#PeperoniSausageTopping> .
        } 
    
    WINDOW <tw1> {
        ?id a <http://www.w3.org/ns/sosa/Observation>;
        <http://www.w3.org/ns/sosa/hasFeatureOfInterest> ?pizza;
        <http://www.w3.org/ns/sosa/hasResult> ?res.
        ?res <http://qudt.org/1.1/schema/qudt#numericValue> ?valueAmerican.
        }
        FILTER( 260 < ?valueAmerican && ?valueAmerican < 280 && ?american = ?pizza)
        }
    }   
'''

Create the query with jasper


In [13]:
jasper.create(qid, body, tbox, frmt)

http://jasper:8181/jasper/queries/cooked

Check if the "cooked" stream has been created.

In [14]:
jasper.streams()

[http://jasper:8181/jasper/streams/cooked,
 http://streamhub:9292/streamhub/streams/temperaturestream,
 http://streamhub:9292/streamhub/streams/pizzastream]

Expose the socket to check the results

In [15]:
r = jasper.expose(qid, 'HTTP')#, retention=5)

In [16]:
endpoint = r.endpoints()[0]
endpoint

GET http://jasper:8182/jasper/streams/cooked/observers/121084795

Check the response

In [20]:
resp = endpoint.call()
resp

[
    {
        "head": {
            "vars": [
                "margherita",
                "american",
                "valueAmerican",
                "valueMargherita"
            ]
        },
        "results": {
            "bindings": [
                {
                    "margherita": {
                        "type": "uri",
                        "value": "http://jasper:8181/36719a59-2eef-45a1-a8d1-cc397a590458"
                    },
                    "valueMargherita": {
                        "type": "literal",
                        "datatype": "http://www.w3.org/2001/XMLSchema#decimal",
                        "value": "282.3168296186461"
                    }
                },
                {
                    "margherita": {
                        "type": "uri",
                        "value": "http://jasper:8181/36719a59-2eef-45a1-a8d1-cc397a590458"
                    },
                    "valueMargherita": {
                        "type": "literal",

Delete the query at the end of the process

In [21]:
jasper.delete(qid)

{
    "@id": "queries/cooked",
    "@type": "vprov:Task",
    "prov:generated": {
        "@id": "streams/cooked"
    },
    "prov:uses": {
        "@id": "http://streamhub:9292/streamhub/streams/temperaturestream"
    },
    "@context": {
        "@base": "http://jasper:8181/jasper/",
        "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
        "vsd": "http://w3id.org/rsp/vocals-sd#",
        "vocals": "http://w3id.org/rsp/vocals#",
        "xsd": "http://www.w3.org/2001/XMLSchema#",
        "format": "http://www.w3.org/ns/formats/",
        "rdfs": "http://www.w3.org/2000/01/rdf-schema#",
        "vprov": "http://w3id.org/rsp/vocals-prov#",
        "dcat": "http://www.w3.org/ns/dcat#",
        "prov": "http://www.w3.org/ns/prov#"
    }
}