In [11]:
import numpy as np
import pandas as pd

In [16]:
from kgforge.core.archetypes.store import Store
from kgforge.core.resource import Resource

from typing import Optional, Dict, Any, Union

from neo4j import GraphDatabase

- Store attributes:

    - `endpoint` bolt address
    - `bucket` nothing (or name of the graph if we work with named graphs)
    - `token` Neo4j doesn't have token authentication (login/pwd)
    - `versioned_id_template` How to manage that? we can add deprecation and versioning mechanism, but is it a good practice? Neo4j generates it's own id's, do we use them or generate ours?
    - `file_resource_mapping` don't think you can upload files
    - `model_context` we can provide, but i don't think it's used much in PGs. should we store props like 'schema:about:@id'?

- Store methods:

    - `register` given a resource how to represent it within a PG (create a node, create an edge, add a property?)
    - `retrieve` given an id do I query nodes/edges/properties or all of them.
    
Separation between nodes/edges/properties is very strong in PG, while in RDF everything is the 'same-level-citizen'

In [18]:
class DemoStore(Store):
    """An example to show how to implement a Store and to demonstrate how it is used."""

    def __init__(self, endpoint: Optional[str] = None, bucket: Optional[str] = None,
                 token: Optional[str] = None, versioned_id_template: Optional[str] = None,
                 file_resource_mapping: Optional[str] = None,
                 model_context: Optional[Context] = None) -> None:
        super().__init__(endpoint, bucket, token, versioned_id_template, file_resource_mapping,
                         model_context)


    # [C]RUD.

    def _register_one(self, resource: Resource, schema_id: str) -> None:
        data = as_json(resource, expanded=False, store_metadata=False, model_context=None,
                       metadata_context=None, context_resolver=None)
        try:
            record = self.service.create(data)
        except StoreLibrary.RecordExists:
            raise RegistrationError("resource already exists")
        else:
            resource.id = record["data"]["id"]
            resource._store_metadata = wrap_dict(record["metadata"])

    # C[R]UD.

    def retrieve(self, id: str, version: Optional[Union[int, str]],
                 cross_bucket: bool) -> Resource:
        if cross_bucket:
            not_supported(("cross_bucket", True))
        try:
            record = self.service.read(id, version)
        except StoreLibrary.RecordMissing:
            raise RetrievalError("resource not found")
        else:
            return _to_resource(record)

    # CR[U]D.

    def _update_one(self, resource: Resource) -> None:
        data = as_json(resource, expanded=False, store_metadata=False, model_context=None,
                       metadata_context=None, context_resolver=None)
        try:
            record = self.service.update(data)
        except StoreLibrary.RecordMissing:
            raise UpdatingError("resource not found")
        except StoreLibrary.RecordDeprecated:
            raise UpdatingError("resource is deprecated")
        else:
            resource._store_metadata = wrap_dict(record["metadata"])

    def _tag_one(self, resource: Resource, value: str) -> None:
        # Chosen case: tagging does not modify the resource.
        rid = resource.id
        version = resource._store_metadata.version
        try:
            self.service.tag(rid, version, value)
        except StoreLibrary.TagExists:
            raise TaggingError("resource version already tagged")
        except StoreLibrary.RecordMissing:
            raise TaggingError("resource not found")

    # CRU[D].

    def _deprecate_one(self, resource: Resource) -> None:
        rid = resource.id
        try:
            record = self.service.deprecate(rid)
        except StoreLibrary.RecordMissing:
            raise DeprecationError("resource not found")
        except StoreLibrary.RecordDeprecated:
            raise DeprecationError("resource already deprecated")
        else:
            resource._store_metadata = wrap_dict(record["metadata"])

    # Querying.

    def search(self, resolvers: Optional[List[Resolver]], *filters, **params) -> List[Resource]:

        cross_bucket = params.get("cross_bucket", None)
        if cross_bucket is not None:
            not_supported(("cross_bucket", True))
        # TODO DKE-145.
        print("<info> DemoStore does not support handling of errors with QueryingError for now.")
        # TODO DKE-145.
        print("<info> DemoStore does not support traversing lists for now.")
        if params:
            # TODO DKE-145.
            print("DemoStore does not support 'resolving' and 'lookup' parameters for now.")
        conditions = [f"x.{'.'.join(x.path)}.{x.operator}({x.value!r})" for x in filters]
        records = self.service.find(conditions)
        return [_to_resource(x) for x in records]

    # Utils.

    def _initialize_service(self, endpoint: Optional[str], bucket: Optional[str],
                            token: Optional[str], searchendpoints:Optional[Dict]):
        return StoreLibrary()


def _to_resource(record: Dict) -> Resource:
    # TODO This operation might be abstracted in core when other stores will be implemented.
    resource = from_json(record["data"], None)
    resource._store_metadata = wrap_dict(record["metadata"])
    resource._synchronized = True
    return resource



NameError: name 'Context' is not defined

In [21]:
uri = "bolt://localhost:7687"
user = "neo4j"
pwd = "admin"

In [25]:
class Neo4jService:
    """Simulate a third-party library handling interactions with the database used by the store."""

    def __init__(self, driver: Optional[GraphDatabase] = None,
                 uri: Optional[str] = None, user: Optional[str] = None,
                 pwd: Optiona[str] = None):
        
        self.driver: GraphDatabase = None
    
        if driver is not None:
            self.driver = driver
        else:
            self.driver = GraphDatabase.driver(uri=uri, auth=(user, pwd))
            
    def add_node():
        pass
    
    def add_nodes():
        pass
    
    def add_edge():
        pass
    
    def add_edges():
        pass
    
    def set_node_property():
        pass
    
    def set_edge_property():
        pass

    def get_node_properties():
        pass
    
    def get_edge_properties():
        pass
    
    def create(self, data: Dict) -> Dict:
        record = self._record(data, 1, False)
        rid = record["data"]["id"]
        if rid in self.records.keys():
            raise self.RecordExists
        self.records[rid] = record
        return record

    def read(self, rid: str, version: Optional[Union[int, str]]) -> Dict:
        try:
            if version is not None:
                if isinstance(version, str):
                    tkey = self._tag_id(rid, version)
                    version = self.tags[tkey]
                akey = self._archive_id(rid, version)
                record = self.archives[akey]
            else:
                record = self.records[rid]
        except KeyError:
            raise self.RecordMissing
        else:
            return record

    def update(self, data: Dict) -> Dict:
        rid = data.get("id", None)
        try:
            record = self.records[rid]
        except KeyError:
            raise self.RecordMissing
        else:
            metadata = record["metadata"]
            if metadata["deprecated"]:
                raise self.RecordDeprecated
            version = metadata["version"]
            key = self._archive_id(rid, version)
            self.archives[key] = record
            new_record = self._record(data, version + 1, False)
            self.records[rid] = new_record
            return new_record

    def deprecate(self, rid: str) -> Dict:
        try:
            record = self.records[rid]
        except KeyError:
            raise self.RecordMissing
        else:
            metadata = record["metadata"]
            if metadata["deprecated"]:
                raise self.RecordDeprecated
            version = metadata["version"]
            key = self._archive_id(rid, version)
            self.archives[key] = record
            data = record["data"]
            new_record = self._record(data, version + 1, True)
            self.records[rid] = new_record
            return new_record

    def tag(self, rid: str, version: int, value: str) -> None:
        if rid in self.records:
            key = self._tag_id(rid, value)
            if key in self.tags:
                raise self.TagExists
            else:
                self.tags[key] = version
        else:
            raise self.RecordMissing

    def find(self, conditions: List[str]) -> List[Dict]:
        return [r for r in self.records.values()
                if all(eval(c, {}, {"x": wrap_dict(r["data"])}) for c in conditions)]

    def _record(self, data: Dict, version: int, deprecated: bool) -> Dict:
        copy = deepcopy(data)
        if "id" not in copy:
            copy["id"] = self._new_id()
        return {
            "data": copy,
            "metadata": {
                "version": version,
                "deprecated": deprecated,
            },
        }

    @staticmethod
    def _new_id() -> str:
        return str(uuid4())

    @staticmethod
    def _archive_id(rid: str, version: int) -> str:
        return f"{rid}_version={version}"

    @staticmethod
    def _tag_id(rid: str, tag: str) -> str:
        return f"{rid}_tag={tag}"

    class RecordExists(Exception):
        pass

    class RecordMissing(Exception):
        pass

    class RecordDeprecated(Exception):
        pass

    class TagExists(Exception):
        pass

IndentationError: expected an indented block (<ipython-input-25-235039013d04>, line 38)

In [17]:
GraphDatabase

In [15]:
class Neo4jStore(Store):
    """Implementation of the Neo4j DB store."""
    
    def __init__(self, endpoint: Optional[str] = None, bucket: Optional[str] = None,
                 token: Optional[str] = None, versioned_id_template: Optional[str] = None,
                 file_resource_mapping: Optional[str] = None,
                 model_context: Optional[Context] = None, searchendpoints:Optional[Dict] = None) -> None:
    
    def _initialize_service(self,
                            endpoint: Optional[str],
                            bucket: Optional[str],
                            token: Optional[str],
                            searchendpoints:Optional[Dict] = None) -> Any:
        # POLICY Should initialize the access to the store according to its configuration.
        return Service(endpoint, self.organisation, self.project, token, self.model_context, 200,searchendpoints)

    def retrieve(self, id: str, version: Optional[Union[int, str]],
                 cross_bucket: bool) -> Resource:
        pass

    def _register_one(self, resource: Resource, schema_id: str) -> None:
        pass
    
    def _update_one(self, resource: Resource) -> None:
        # POLICY Should notify of failures with exception UpdatingError including a message.
        # POLICY Resource _store_metadata should be set using wrappers.dict.wrap_dict().
        # TODO This operation might be abstracted here when other stores will be implemented.
        pass

In [None]:

except ValueError:
    raise ValueError("malformed bucket parameter, expecting 'organization/project' like")
else:
    return Service(endpoint, self.organisation, self.project, token, self.model_context, 200,searchendpoints)
