# Quering vertically partitioned data as if it is one dataset (simplest case)
Here we explore whether we can query a vertically partitioned dataset (i.e. datasets share the same sample ID space, but sample feature space is split across datasets) as if it was one dataset.
Here we make very strong assumptions:
    * Data is tabular and consists of only 1 table
        (i.e. only one type of subject, and predicate-object pairs are properties)
    * The subject name itself is a unique identifier across data sources
    * Users want to have all the data

In [1]:
cd ..

/Users/svenvanderburg/projects/vantage6-algorithms


In [71]:
from collections import defaultdict

import rdflib
import pandas as pd

## Load sample transaction data file A
Only has log:processedAt and log:processedBy information

In [72]:
filepath_a = 'data/sample-transaction-data-a.ttl'
graph_a = rdflib.Graph()
result = graph_a.parse(filepath_a, format='ttl')
print(graph_a.serialize(format="turtle").decode("utf-8"))

@prefix log: <http://example.org/ont/transaction-log/> .
@prefix srv: <http://example.org/data/server/> .
@prefix txn: <http://example.org/data/transaction/> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .

txn:123 a log:Transaction ;
    log:processedAt "2015-10-16T10:22:23"^^xsd:dateTime ;
    log:processedBy srv:A .

txn:124 a log:Transaction ;
    log:processedAt "2015-10-16T10:22:24"^^xsd:dateTime ;
    log:processedBy srv:B .

txn:125 a log:Transaction ;
    log:processedAt "2015-10-16T10:22:24"^^xsd:dateTime ;
    log:processedBy srv:C .

txn:126 a log:Transaction ;
    log:processedAt "2015-10-16T10:22:25"^^xsd:dateTime ;
    log:processedBy srv:A .

txn:127 a log:Transaction ;
    log:processedAt "2015-10-16T10:22:25"^^xsd:dateTime ;
    log:processedBy srv:B .

txn:128 a log:Transaction ;
    log:processedAt "2015-10-16T10:22:26"^^xsd:dateTime ;
    log:processedBy srv:C .

txn:129 a log:Transaction ;
    log:processedAt "2015-10-16T10:22:28"^^xsd:dateTime ;
    log:proc

## Load sample transaction data file B
Only has log:statusCode information

In [73]:
filepath_b = 'data/sample-transaction-data-b.ttl'
graph_b = rdflib.Graph()
result = graph_b.parse(filepath_b, format='ttl')
print(graph_b.serialize(format="turtle").decode("utf-8"))

@prefix log: <http://example.org/ont/transaction-log/> .
@prefix txn: <http://example.org/data/transaction/> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .

txn:123 a log:Transaction ;
    log:statusCode 200 .

txn:124 a log:Transaction ;
    log:statusCode 200 .

txn:125 a log:Transaction ;
    log:statusCode 200 .

txn:126 a log:Transaction ;
    log:statusCode 200 .

txn:127 a log:Transaction ;
    log:statusCode 200 .

txn:128 a log:Transaction ;
    log:statusCode 200 .

txn:129 a log:Transaction ;
    log:statusCode 500 .

txn:130 a log:Transaction ;
    log:statusCode 200 .

txn:131 a log:Transaction ;
    log:statusCode 200 .

txn:132 a log:Transaction ;
    log:statusCode 500 .

txn:133 a log:Transaction ;
    log:statusCode 200 .

txn:134 a log:Transaction ;
    log:statusCode 200 .

txn:135 a log:Transaction ;
    log:statusCode 401 .




In [106]:
class VerticalQueryClient():
    """
    Client for querying a vertically partitioned dataset.
    Makes some strong assumptions:
    * Data is tabular and consists of only 1 table
        (i.e. only one type of subject, and predicate-object pairs are properties)
    * The subject name itself is a unique identifier across data sources
    """
    def __init__(self, graphs):
        self.graphs = graphs
        
    @staticmethod
    def _sparql_to_pandas(result: rdflib.plugins.sparql.processor.SPARQLResult):
        """
        Convert sparql result to pandas. Group all properties (predicate-object pairs)
        for corresponding subject. Set subject as index.
        """
        object2property = defaultdict(dict)
        for o, p, s in result:
            object2property[o][p] = s

        data = list()
        for obj, properties in object2property.items():
            properties['subj'] = obj
            data.append(properties)
        df = pd.DataFrame(data)
        df = df.set_index('subj', drop=True)
        return df

    def query(self):
        """
        Query vertically partitioned data. Select all data from different data sources.
        Convert data to pandas DataFrame by grouping all properties (predicate-object pairs) for
        corresponding subjects even though they might come from different data sources.
        """
        q = '''
            SELECT ?o ?p ?s
            WHERE {?o ?p ?s .}
        '''
        dfs = list()
        for graph in self.graphs:
            result = graph.query(q)
            df = self._sparql_to_pandas(result)
            dfs.append(df)
        return pd.concat(dfs, axis=1)

In [107]:
client = VerticalQueryClient([graph_a, graph_b])
client.query()

Unnamed: 0,http://example.org/ont/transaction-log/processedBy,http://www.w3.org/1999/02/22-rdf-syntax-ns#type,http://example.org/ont/transaction-log/processedAt,http://example.org/ont/transaction-log/statusCode,http://www.w3.org/1999/02/22-rdf-syntax-ns#type.1
http://example.org/data/transaction/123,http://example.org/data/server/A,http://example.org/ont/transaction-log/Transac...,2015-10-16T10:22:23,200,http://example.org/ont/transaction-log/Transac...
http://example.org/data/transaction/131,http://example.org/data/server/C,http://example.org/ont/transaction-log/Transac...,2015-10-16T10:22:31,200,http://example.org/ont/transaction-log/Transac...
http://example.org/data/transaction/127,http://example.org/data/server/B,http://example.org/ont/transaction-log/Transac...,2015-10-16T10:22:25,200,http://example.org/ont/transaction-log/Transac...
http://example.org/data/transaction/132,http://example.org/data/server/A,http://example.org/ont/transaction-log/Transac...,2015-10-16T10:22:32,500,http://example.org/ont/transaction-log/Transac...
http://example.org/data/transaction/128,http://example.org/data/server/C,http://example.org/ont/transaction-log/Transac...,2015-10-16T10:22:26,200,http://example.org/ont/transaction-log/Transac...
http://example.org/data/transaction/135,http://example.org/data/server/A,http://example.org/ont/transaction-log/Transac...,2015-10-16T10:22:35,401,http://example.org/ont/transaction-log/Transac...
http://example.org/data/transaction/124,http://example.org/data/server/B,http://example.org/ont/transaction-log/Transac...,2015-10-16T10:22:24,200,http://example.org/ont/transaction-log/Transac...
http://example.org/data/transaction/130,http://example.org/data/server/B,http://example.org/ont/transaction-log/Transac...,2015-10-16T10:22:31,200,http://example.org/ont/transaction-log/Transac...
http://example.org/data/transaction/134,http://example.org/data/server/C,http://example.org/ont/transaction-log/Transac...,2015-10-16T10:22:33,200,http://example.org/ont/transaction-log/Transac...
http://example.org/data/transaction/129,http://example.org/data/server/A,http://example.org/ont/transaction-log/Transac...,2015-10-16T10:22:28,500,http://example.org/ont/transaction-log/Transac...
