Skip to content

Commit

Permalink
OGM-1111 Implements foreach method in Neo4j remote dialects
Browse files Browse the repository at this point in the history
  • Loading branch information
DavideD committed Dec 11, 2016
1 parent ec2cb8e commit c6c4299
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 19 deletions.
Expand Up @@ -31,23 +31,26 @@
import org.hibernate.ogm.datastore.neo4j.remote.bolt.dialect.impl.BoltNeo4jTupleSnapshot;
import org.hibernate.ogm.datastore.neo4j.remote.bolt.dialect.impl.BoltNeo4jTypeConverter;
import org.hibernate.ogm.datastore.neo4j.remote.bolt.dialect.impl.NodeWithEmbeddedNodes;
import org.hibernate.ogm.datastore.neo4j.remote.bolt.impl.BoltNeo4jClient;
import org.hibernate.ogm.datastore.neo4j.remote.bolt.impl.BoltNeo4jDatastoreProvider;
import org.hibernate.ogm.datastore.neo4j.remote.common.dialect.impl.RemoteNeo4jAssociationPropertiesRow;
import org.hibernate.ogm.datastore.neo4j.remote.common.dialect.impl.RemoteNeo4jAssociationSnapshot;
import org.hibernate.ogm.datastore.neo4j.remote.common.util.impl.RemoteNeo4jHelper;
import org.hibernate.ogm.datastore.spi.DatastoreProvider;
import org.hibernate.ogm.dialect.query.spi.BackendQuery;
import org.hibernate.ogm.dialect.query.spi.ClosableIterator;
import org.hibernate.ogm.dialect.query.spi.QueryParameters;
import org.hibernate.ogm.dialect.spi.AssociationContext;
import org.hibernate.ogm.dialect.spi.ModelConsumer;
import org.hibernate.ogm.dialect.spi.ModelConsumerWithSupplier;
import org.hibernate.ogm.dialect.spi.NextValueRequest;
import org.hibernate.ogm.dialect.spi.OperationContext;
import org.hibernate.ogm.dialect.spi.TransactionContext;
import org.hibernate.ogm.dialect.spi.TupleAlreadyExistsException;
import org.hibernate.ogm.dialect.spi.TupleContext;
import org.hibernate.ogm.dialect.spi.TupleSupplier;
import org.hibernate.ogm.dialect.spi.TupleTypeContext;
import org.hibernate.ogm.entityentry.impl.TuplePointer;
import org.hibernate.ogm.exception.NotSupportedException;
import org.hibernate.ogm.model.key.spi.AssociatedEntityKeyMetadata;
import org.hibernate.ogm.model.key.spi.AssociationKey;
import org.hibernate.ogm.model.key.spi.AssociationKeyMetadata;
Expand All @@ -65,6 +68,7 @@
import org.hibernate.persister.collection.CollectionPersister;
import org.hibernate.persister.entity.EntityPersister;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.Statement;
import org.neo4j.driver.v1.StatementResult;
import org.neo4j.driver.v1.Transaction;
Expand Down Expand Up @@ -599,6 +603,44 @@ public Number nextValue(NextValueRequest request) {

@Override
public void forEachTuple(ModelConsumer consumer, TupleTypeContext tupleTypeContext, EntityKeyMetadata entityKeyMetadata) {
throw new NotSupportedException( "OGM-1111", "This is not supported yet for Neo4j remote" );
DatastoreProvider datastoreProvider = getServiceRegistry().getService( DatastoreProvider.class );
BoltNeo4jDatastoreProvider neo4jProvider = (BoltNeo4jDatastoreProvider) datastoreProvider;
BoltNeo4jClient client = neo4jProvider.getClient();
BoltTupleSupplier tupleSupplier = new BoltTupleSupplier( entitiesQueries.get( entityKeyMetadata ), entityKeyMetadata, tupleTypeContext, client );
( (ModelConsumerWithSupplier) consumer ).consume( tupleSupplier );
}

private static class BoltTupleSupplier implements TupleSupplier {

private final BoltNeo4jEntityQueries entityQueries;
private final EntityKeyMetadata entityKeyMetadata;
private final TupleTypeContext tupleTypeContext;
private final BoltNeo4jClient boltClient;

public BoltTupleSupplier(
BoltNeo4jEntityQueries entityQueries,
EntityKeyMetadata entityKeyMetadata,
TupleTypeContext tupleTypeContext,
BoltNeo4jClient boltClient) {
this.entityQueries = entityQueries;
this.entityKeyMetadata = entityKeyMetadata;
this.tupleTypeContext = tupleTypeContext;
this.boltClient = boltClient;
}

public ClosableIterator<Tuple> get(TransactionContext transactionContext) {
boolean shouldCloseTransaction = transactionContext == null;
Transaction tx = transaction( transactionContext );
ClosableIterator<NodeWithEmbeddedNodes> entities = entityQueries.findEntitiesWithEmbedded( tx );
return new BoltNeo4jNodesTupleIterator( tx, entityQueries, entityKeyMetadata, tupleTypeContext, entities, shouldCloseTransaction );
}

private Transaction transaction(TransactionContext transactionContext) {
if ( transactionContext == null ) {
Session session = boltClient.getDriver().session();
return session.beginTransaction();
}
return (Transaction) transactionContext.getTransactionId();
}
}
}
Expand Up @@ -48,11 +48,13 @@
import org.hibernate.ogm.dialect.query.spi.QueryParameters;
import org.hibernate.ogm.dialect.spi.AssociationContext;
import org.hibernate.ogm.dialect.spi.ModelConsumer;
import org.hibernate.ogm.dialect.spi.ModelConsumerWithSupplier;
import org.hibernate.ogm.dialect.spi.NextValueRequest;
import org.hibernate.ogm.dialect.spi.OperationContext;
import org.hibernate.ogm.dialect.spi.TransactionContext;
import org.hibernate.ogm.dialect.spi.TupleAlreadyExistsException;
import org.hibernate.ogm.dialect.spi.TupleContext;
import org.hibernate.ogm.dialect.spi.TupleSupplier;
import org.hibernate.ogm.dialect.spi.TupleTypeContext;
import org.hibernate.ogm.entityentry.impl.TuplePointer;
import org.hibernate.ogm.model.key.spi.AssociatedEntityKeyMetadata;
Expand Down Expand Up @@ -513,18 +515,31 @@ private void putOneToOneAssociation(EntityKey ownerKey, Tuple tuple, Map<String,

@Override
public void forEachTuple(ModelConsumer consumer, TupleTypeContext tupleTypeContext, EntityKeyMetadata entityKeyMetadata) {
// TODO OGM-1111 we don't have a transaction context here as we are not in a session yet.
// This is now clear thanks to the new TupleTypeContext contract.
// Long txId = transactionId( tupleContext.getTransactionContext() );
Long txId = null;
HttpNeo4jEntityQueries queries = entityQueries.get( entityKeyMetadata );
ClosableIterator<NodeWithEmbeddedNodes> queryNodes = entityQueries.get( entityKeyMetadata ).findEntitiesWithEmbedded( client, txId );
while ( queryNodes.hasNext() ) {
NodeWithEmbeddedNodes next = queryNodes.next();
Map<String, Node> associatedEntities = HttpNeo4jAssociatedNodesHelper.findAssociatedNodes( client, txId, next, entityKeyMetadata, tupleTypeContext,
queries );
Tuple tuple = new Tuple( new HttpNeo4jTupleSnapshot( next, entityKeyMetadata, associatedEntities, tupleTypeContext ), SnapshotType.UPDATE );
consumer.consume( tuple );
HttpTupleSupplier tupleSupplier = new HttpTupleSupplier( entityQueries.get( entityKeyMetadata ), entityKeyMetadata, tupleTypeContext, client );
( (ModelConsumerWithSupplier) consumer ).consume( tupleSupplier );
}

private static class HttpTupleSupplier implements TupleSupplier {

private final HttpNeo4jEntityQueries entityQueries;
private final EntityKeyMetadata entityKeyMetadata;
private final TupleTypeContext tupleTypeContext;
private final HttpNeo4jClient httpClient;

public HttpTupleSupplier(HttpNeo4jEntityQueries entityQueries,
EntityKeyMetadata entityKeyMetadata,
TupleTypeContext tupleTypeContext,
HttpNeo4jClient httpClient) {
this.entityQueries = entityQueries;
this.entityKeyMetadata = entityKeyMetadata;
this.tupleTypeContext = tupleTypeContext;
this.httpClient = httpClient;
}

public ClosableIterator<Tuple> get(TransactionContext transactionContext) {
Long txId = transactionContext == null ? null : (Long) transactionContext.getTransactionId();
ClosableIterator<NodeWithEmbeddedNodes> entities = entityQueries.findEntitiesWithEmbedded( httpClient, txId );
return new HttpNeo4jNodesTupleIterator( httpClient, txId, entityQueries, entityKeyMetadata, tupleTypeContext, entities );
}
}

Expand Down Expand Up @@ -553,7 +568,7 @@ public ClosableIterator<Tuple> executeBackendQuery(BackendQuery<String> backendQ
keys[i] = new EntityKey( entityKeyMetadata, values );
}
ClosableIterator<NodeWithEmbeddedNodes> entities = entityQueries.get( entityKeyMetadata ).findEntities( client, keys, txId );
return new HttpNeo4jNodesTupleIterator( client, txId, queries, response, entityKeyMetadata, tupleContext.getTupleTypeContext(), entities );
return new HttpNeo4jNodesTupleIterator( client, txId, queries, entityKeyMetadata, tupleContext.getTupleTypeContext(), entities );
}
else {
statement.setResultDataContents( Arrays.asList( Statement.AS_ROW ) );
Expand Down
Expand Up @@ -73,6 +73,11 @@ private void close(ClosableIterator<?> closableIterator) {
}
}

public ClosableIterator<NodeWithEmbeddedNodes> findEntitiesWithEmbedded(Transaction tx) {
StatementResult results = tx.run( getFindEntitiesQuery() );
return closableIterator( results );
}

public ClosableIterator<NodeWithEmbeddedNodes> findEntities(EntityKey[] keys, Transaction tx) {
if ( singlePropertyKey ) {
return singlePropertyIdFindEntities( keys, tx );
Expand Down
Expand Up @@ -29,18 +29,30 @@ public class BoltNeo4jNodesTupleIterator implements ClosableIterator<Tuple> {
private final TupleTypeContext tupleTypeContext;
private final ClosableIterator<NodeWithEmbeddedNodes> entities;
private final Transaction tx;
private final boolean closeTransaction;

public BoltNeo4jNodesTupleIterator(
Transaction tx,
BoltNeo4jEntityQueries entityQueries,
EntityKeyMetadata entityKeyMetadata,
TupleTypeContext tupleTypeContext,
ClosableIterator<NodeWithEmbeddedNodes> entities) {
this( tx, entityQueries, entityKeyMetadata, tupleTypeContext, entities, false );
}

public BoltNeo4jNodesTupleIterator(
Transaction tx,
BoltNeo4jEntityQueries entityQueries,
EntityKeyMetadata entityKeyMetadata,
TupleTypeContext tupleTypeContext,
ClosableIterator<NodeWithEmbeddedNodes> entities,
boolean closeTransaction) {
this.tx = tx;
this.entityQueries = entityQueries;
this.entityKeyMetadata = entityKeyMetadata;
this.tupleTypeContext = tupleTypeContext;
this.entities = entities;
this.closeTransaction = closeTransaction;
}

private Tuple createTuple(NodeWithEmbeddedNodes node) {
Expand All @@ -66,6 +78,16 @@ public void remove() {

@Override
public void close() {
entities.close();
try {
entities.close();
if ( closeTransaction ) {
tx.success();
}
}
finally {
if ( closeTransaction ) {
tx.close();
}
}
}
}
Expand Up @@ -314,7 +314,9 @@ private List<StatementResult> executeQuery(HttpNeo4jClient executionEngine, Long
}

private List<StatementResult> executeQuery(HttpNeo4jClient executionEngine, Long txId, Statements statements) {
StatementsResponse statementsResponse = executionEngine.executeQueriesInOpenTransaction( txId, statements );
StatementsResponse statementsResponse = txId == null
? executionEngine.executeQueriesInNewTransaction( statements )
: executionEngine.executeQueriesInOpenTransaction( txId, statements );
validate( statementsResponse );
List<StatementResult> results = statementsResponse.getResults();
if ( results == null || results.isEmpty() ) {
Expand Down
Expand Up @@ -10,7 +10,6 @@

import org.hibernate.ogm.datastore.neo4j.remote.http.impl.HttpNeo4jClient;
import org.hibernate.ogm.datastore.neo4j.remote.http.json.impl.Graph.Node;
import org.hibernate.ogm.datastore.neo4j.remote.http.json.impl.StatementsResponse;
import org.hibernate.ogm.dialect.query.spi.ClosableIterator;
import org.hibernate.ogm.dialect.spi.TupleTypeContext;
import org.hibernate.ogm.model.key.spi.EntityKeyMetadata;
Expand All @@ -36,7 +35,6 @@ public HttpNeo4jNodesTupleIterator(
HttpNeo4jClient client,
Long txId,
HttpNeo4jEntityQueries entityQueries,
StatementsResponse response,
EntityKeyMetadata entityKeyMetadata,
TupleTypeContext tupleTypeContext,
ClosableIterator<NodeWithEmbeddedNodes> entities) {
Expand Down

0 comments on commit c6c4299

Please sign in to comment.