Skip to content

Commit

Permalink
OGM-1285 Enable grouping on Infinispan remote
Browse files Browse the repository at this point in the history
  • Loading branch information
DavideD authored and gsmet committed Jul 7, 2017
1 parent 3c5e490 commit 16bc018
Showing 1 changed file with 188 additions and 109 deletions.
Expand Up @@ -26,20 +26,27 @@
import org.hibernate.ogm.datastore.infinispanremote.logging.impl.Log;
import org.hibernate.ogm.datastore.infinispanremote.logging.impl.LoggerFactory;
import org.hibernate.ogm.datastore.map.impl.MapAssociationSnapshot;
import org.hibernate.ogm.datastore.map.impl.MapHelpers;
import org.hibernate.ogm.datastore.map.impl.MapTupleSnapshot;
import org.hibernate.ogm.dialect.batch.spi.GroupedChangesToEntityOperation;
import org.hibernate.ogm.dialect.batch.spi.InsertOrUpdateAssociationOperation;
import org.hibernate.ogm.dialect.batch.spi.InsertOrUpdateTupleOperation;
import org.hibernate.ogm.dialect.batch.spi.Operation;
import org.hibernate.ogm.dialect.batch.spi.RemoveAssociationOperation;
import org.hibernate.ogm.dialect.impl.AbstractGroupingByEntityDialect;
import org.hibernate.ogm.dialect.multiget.spi.MultigetGridDialect;
import org.hibernate.ogm.dialect.query.spi.ClosableIterator;
import org.hibernate.ogm.dialect.spi.AssociationContext;
import org.hibernate.ogm.dialect.spi.AssociationTypeContext;
import org.hibernate.ogm.dialect.spi.BaseGridDialect;
import org.hibernate.ogm.dialect.spi.DuplicateInsertPreventionStrategy;
import org.hibernate.ogm.dialect.spi.ModelConsumer;
import org.hibernate.ogm.dialect.spi.NextValueRequest;
import org.hibernate.ogm.dialect.spi.OperationContext;
import org.hibernate.ogm.dialect.spi.TransactionContext;
import org.hibernate.ogm.dialect.spi.TupleAlreadyExistsException;
import org.hibernate.ogm.dialect.spi.TupleContext;
import org.hibernate.ogm.dialect.spi.TuplesSupplier;
import org.hibernate.ogm.dialect.spi.TupleTypeContext;
import org.hibernate.ogm.dialect.spi.TuplesSupplier;
import org.hibernate.ogm.entityentry.impl.TuplePointer;
import org.hibernate.ogm.model.key.spi.AssociationKey;
import org.hibernate.ogm.model.key.spi.AssociationKeyMetadata;
Expand Down Expand Up @@ -82,7 +89,7 @@
*
* @author Sanne Grinovero
*/
public class InfinispanRemoteDialect<EK,AK,ISK> extends BaseGridDialect implements MultigetGridDialect {
public class InfinispanRemoteDialect<EK,AK,ISK> extends AbstractGroupingByEntityDialect implements MultigetGridDialect {

private static final Log log = LoggerFactory.getLogger();

Expand All @@ -94,7 +101,11 @@ public InfinispanRemoteDialect(InfinispanRemoteDatastoreProvider provider) {

@Override
public Tuple getTuple(EntityKey key, OperationContext operationContext) {
final String cacheName = key.getTable();
return getTuple( provider, key );
}

private static Tuple getTuple(InfinispanRemoteDatastoreProvider provider, EntityKey key) {
final String cacheName = cacheName( key );
ProtoStreamMappingAdapter mapper = provider.getDataMapperForCache( cacheName );
ProtostreamId idBuffer = mapper.createIdPayload( key.getColumnNames(), key.getColumnValues() );
VersionedValue<ProtostreamPayload> v = mapper.withinCacheEncodingContext( c -> c.getVersioned( idBuffer ) );
Expand All @@ -117,42 +128,190 @@ public Tuple createTuple(EntityKey key, OperationContext operationContext) {
}

@Override
public void insertOrUpdateTuple(EntityKey key, TuplePointer tuplePointer, TupleContext tupleContext) {
VersionedTuple versionedTuple = (VersionedTuple) tuplePointer.getTuple();
final String cacheName = key.getTable();
log.debugf( "insertOrUpdateTuple for key '%s' on cache '%s'", key, cacheName );
ProtoStreamMappingAdapter mapper = provider.getDataMapperForCache( cacheName );
ProtostreamPayload valuePayload = mapper.createValuePayload( versionedTuple );
ProtostreamId idBuffer = mapper.createIdPayload( key.getColumnNames(), key.getColumnValues() );
boolean optimisticLockFailed;
if ( versionedTuple.getSnapshotType() == SnapshotType.INSERT ) {
optimisticLockFailed = null != mapper.withinCacheEncodingContext( c -> c.putIfAbsent( idBuffer, valuePayload ) );
if ( optimisticLockFailed ) {
throw new TupleAlreadyExistsException( key );
protected void executeGroupedChangesToEntity(GroupedChangesToEntityOperation groupedOperation) {
final EntityKey entityKey = groupedOperation.getEntityKey();
final String cacheName = cacheName( entityKey );
final OwningEntity owningEntity = new OwningEntity( provider, entityKey );

for ( Operation operation : groupedOperation.getOperations() ) {
if ( operation instanceof InsertOrUpdateTupleOperation ) {
InsertOrUpdateTupleOperation insertOrUpdateTupleOperation = (InsertOrUpdateTupleOperation) operation;
Tuple tuple = insertOrUpdateTupleOperation.getTuplePointer().getTuple();
owningEntity.applyOperations( tuple );
}
else if ( operation instanceof InsertOrUpdateAssociationOperation ) {
insertOrUpdateAssociation( (InsertOrUpdateAssociationOperation) operation );
}
else if ( operation instanceof RemoveAssociationOperation ) {
log.debugf( "removeAssociation for key '%s' on cache '%s'", entityKey, cacheName );
RemoveAssociationOperation removeAssociationOperation = (RemoveAssociationOperation) operation;
owningEntity.removeAssociation( removeAssociationOperation );
}
else {
throw new IllegalStateException( operation.getClass().getSimpleName() + " not supported here" );
}
}
else {

owningEntity.flushOperations();
}

/**
* Applies the grouped operations to the selected entity.
*/
private static class OwningEntity {

private final InfinispanRemoteDatastoreProvider provider;

// Keep track of the association to remove that are not contained in the entity
private final List<AssociationKey> associationsToRemove = new ArrayList<>();

private final EntityKey ownerEntityKey;

// A representation/ of the entity that we want to create or insert
private Map<String, Object> owningEntity;

// If the entity already exists in the datastore or not
private SnapshotType operationType = SnapshotType.UPDATE;

public OwningEntity(InfinispanRemoteDatastoreProvider provider, EntityKey entityKey) {
this.provider = provider;
this.ownerEntityKey = entityKey;
}

public void flushOperations() {
if ( !associationsToRemove.isEmpty() ) {
for ( AssociationKey key : associationsToRemove ) {
removeAssociationFromBridgeTable( provider, key );
}
}

if ( owningEntity != null ) {
flushEntity();
}
}

private void flushEntity() {
Tuple versionedTuple = new Tuple( new MapTupleSnapshot( owningEntity ), operationType );
ProtoStreamMappingAdapter mapper = provider.getDataMapperForCache( cacheName( ownerEntityKey ) );
ProtostreamId idBuffer = mapper.createIdPayload( ownerEntityKey.getColumnNames(), ownerEntityKey.getColumnValues() );
ProtostreamPayload valuePayload = mapper.createValuePayload( versionedTuple );

if ( operationType == SnapshotType.INSERT ) {
insertEntity( mapper, idBuffer, valuePayload );
}
else {
updateEntity( mapper, idBuffer, valuePayload );
}
}

private void updateEntity(ProtoStreamMappingAdapter mapper, ProtostreamId idBuffer, ProtostreamPayload valuePayload) {
mapper.withinCacheEncodingContext( c -> c.put( idBuffer, valuePayload ) );
}
versionedTuple.setSnapshotType( SnapshotType.UPDATE );

private void insertEntity(ProtoStreamMappingAdapter mapper, ProtostreamId idBuffer, ProtostreamPayload valuePayload) {
boolean optimisticLockError;
ProtostreamPayload result = mapper.withinCacheEncodingContext( c -> c.putIfAbsent( idBuffer, valuePayload ) );
optimisticLockError = null != result;
if ( optimisticLockError ) {
throw new TupleAlreadyExistsException( ownerEntityKey );
}
}

public void removeAssociation(RemoveAssociationOperation removeAssociationOperation) {
AssociationKey associationKey = removeAssociationOperation.getAssociationKey();
AssociationContext associationContext = removeAssociationOperation.getContext();
// N.B. 'key' might match multiple entries
if ( associationStoredWithinEntityEntry( associationKey, associationContext ) ) {
// The entity contains the association
if ( owningEntity == null ) {
TuplePointer entityTuplePointer = getEmbeddingEntityTuplePointer( provider, associationKey, associationContext );
applyOperations( entityTuplePointer.getTuple() );
}
}
else {
// The association is mapped with a bridge "table"
associationsToRemove.add( associationKey );
}
}

/**
* Applies the operations in the tuple to the entity.
* <p>
* It does not touch the datastore.
*/
public void applyOperations(Tuple tuple) {
if ( owningEntity == null ) {
owningEntity = getEntityFromTuple( owningEntity, tuple );
}
if ( tuple.getSnapshotType() == SnapshotType.INSERT ) {
operationType = SnapshotType.INSERT;
}
MapHelpers.applyTupleOpsOnMap( tuple, owningEntity );
}

private Map<String, Object> getEntityFromTuple(Map<String, Object> owningEntity, Tuple tuple) {
if ( tuple != null ) {
if ( owningEntity == null ) {
owningEntity = new HashMap<>();
}
for ( String column : tuple.getColumnNames() ) {
owningEntity.put( column, tuple.get( column ) );
}
}
return owningEntity;
}
}

private void insertOrUpdateAssociation(InsertOrUpdateAssociationOperation insertOrUpdateAssociationOperation) {
AssociationKey associationKey = insertOrUpdateAssociationOperation.getAssociationKey();
org.hibernate.ogm.model.spi.Association association = insertOrUpdateAssociationOperation.getAssociation();
AssociationContext associationContext = insertOrUpdateAssociationOperation.getContext();

if ( !associationStoredWithinEntityEntry( associationKey, associationContext ) ) {
insertOrUpdateAssociationMappedAsDedicatedEntries( associationKey, association, associationContext );
}
}

private static TuplePointer getEmbeddingEntityTuplePointer(InfinispanRemoteDatastoreProvider provider, AssociationKey key, AssociationContext associationContext) {
TuplePointer tuplePointer = associationContext.getEntityTuplePointer();

if ( tuplePointer.getTuple() == null ) {
tuplePointer.setTuple( getTuple( provider, key.getEntityKey() ) );
}

return tuplePointer;
}

@Override
public void removeTuple(EntityKey key, TupleContext tupleContext) {
final String cacheName = key.getTable();
final String cacheName = cacheName( key );
log.debugf( "removeTuple for key '%s' on cache '%s'", key, cacheName );
ProtoStreamMappingAdapter mapper = provider.getDataMapperForCache( cacheName );
ProtostreamId idBuffer = mapper.createIdPayload( key.getColumnNames(), key.getColumnValues() );
mapper.withinCacheEncodingContext( c -> c.remove( idBuffer ) );
}

private static String cacheName(EntityKey key) {
final String cacheName = key.getTable();
return cacheName;
}

private static String cacheName(AssociationKey key) {
return key.getTable();
}

@Override
public Association getAssociation(AssociationKey key, AssociationContext associationContext) {
Map<RowKey,Map<String, Object>> results = loadRowKeysByQuery( key );
Map<RowKey, Map<String, Object>> results = loadRowKeysByQuery( provider, key );
if ( results.isEmpty() ) {
// For consistency with other dialects,
// it make it easier to test which operations the dialects executes
return null;
}
return new Association( new MapAssociationSnapshot( results ) );
}

private Map<RowKey, Map<String, Object>> loadRowKeysByQuery(AssociationKey key) {
private static Map<RowKey, Map<String, Object>> loadRowKeysByQuery(InfinispanRemoteDatastoreProvider provider, AssociationKey key) {
final String cacheName = key.getTable();
ProtostreamAssociationMappingAdapter mapper = provider.getCollectionsDataMapper( cacheName );
return mapper.withinCacheEncodingContext( c -> {
Expand Down Expand Up @@ -192,18 +351,8 @@ public Association createAssociation(AssociationKey key, AssociationContext asso
return new Association( new MapAssociationSnapshot( associationMap ) );
}

@Override
public void insertOrUpdateAssociation(AssociationKey key, Association association, AssociationContext associationContext) {
if ( associationStoredWithinEntityEntry( key, associationContext ) ) {
insertOrUpdateAssociationEmbeddedInEntity( key, association, associationContext );
}
else {
insertOrUpdateAssociationMappedAsDedicatedEntries( key, association, associationContext );
}
}

private void insertOrUpdateAssociationMappedAsDedicatedEntries(AssociationKey key, Association association, AssociationContext associationContext) {
final String cacheName = key.getTable();
final String cacheName = cacheName( key );
final ProtoStreamMappingAdapter mapper = provider.getDataMapperForCache( cacheName );
log.debugf( "insertOrUpdateAssociation for key '%s' on cache '%s', mapped as dedicated entries in ad-hoc table", key, cacheName );
final List<AssociationOperation> operations = association.getOperations();
Expand All @@ -223,90 +372,20 @@ private void insertOrUpdateAssociationMappedAsDedicatedEntries(AssociationKey ke
throw new AssertionFailure( "Request for CLEAR operation on an association mapped to dedicated entries. Makes no sense?" );
}
}
}

private void insertOrUpdateAssociationEmbeddedInEntity(AssociationKey key, Association association, AssociationContext associationContext) {
final String cacheName = key.getTable();
final ProtoStreamMappingAdapter mapper = provider.getDataMapperForCache( cacheName );
log.debugf( "insertOrUpdateAssociation for key '%s' on cache '%s', mapped as in-entity foreign keys", key, cacheName );
final List<AssociationOperation> operations = association.getOperations();
for ( AssociationOperation ao : operations ) {
AssociationOperationType type = ao.getType();
RowKey rowKey = ao.getKey();
Tuple sourceTuple = ao.getValue();
ProtostreamId idBuffer = mapper.createIdPayload( rowKey.getColumnNames(), rowKey.getColumnValues() );
Tuple targetTuple;
ProtostreamPayload existingPayload = mapper.withinCacheEncodingContext( c -> c.get( idBuffer ) );
if ( existingPayload == null ) {
targetTuple = new Tuple();
targetTuple.setSnapshotType( SnapshotType.INSERT );
}
else {
targetTuple = existingPayload.toTuple( SnapshotType.UPDATE );
}
switch ( type ) {
case PUT:
for ( String columnName : rowKey.getColumnNames() ) {
targetTuple.put( columnName, sourceTuple.get( columnName ) );
}
ProtostreamPayload valuePayloadForPut = mapper.createValuePayload( targetTuple );
mapper.withinCacheEncodingContext( c -> c.put( idBuffer, valuePayloadForPut ) );
break;
case REMOVE:
for ( String columnName : key.getColumnNames() ) {
targetTuple.remove( columnName );
}
ProtostreamPayload valuePayloadForRemove = mapper.createValuePayload( targetTuple );
mapper.withinCacheEncodingContext( c -> c.put( idBuffer, valuePayloadForRemove ) );
break;
case CLEAR:
throw new AssertionFailure( "Request for CLEAR operation on an association mapped as foreign key embedded an an entity. Makes no sense?" );
}
}
}

@Override
public void removeAssociation(AssociationKey key, AssociationContext associationContext) {
// N.B. 'key' might match multiple entries
final String cacheName = key.getTable();
log.debugf( "removeAssociation for key '%s' on cache '%s'", key, cacheName );
final ProtoStreamMappingAdapter mapper = provider.getDataMapperForCache( cacheName );
if ( associationStoredWithinEntityEntry( key, associationContext ) ) {
removeAssociationFromEntity( mapper, key );
}
else {
removeAssociationFromBridgeTable( mapper, key );
}
}

private void removeAssociationFromBridgeTable(ProtoStreamMappingAdapter mapper, AssociationKey key) {
Map<RowKey, Map<String, Object>> rowsMap = loadRowKeysByQuery( key );
for ( RowKey rowKey : rowsMap.keySet() ) {
String[] columnNames = rowKey.getColumnNames();
Object[] columnValues = rowKey.getColumnValues();
ProtostreamId idBuffer = mapper.createIdPayload( columnNames, columnValues );
mapper.withinCacheEncodingContext( c -> c.remove( idBuffer ) ) ;
}
// the snapshot has been updated so we have to clear the various operations added to the Association
association.reset();
}

private void removeAssociationFromEntity(ProtoStreamMappingAdapter mapper, AssociationKey key) {
Map<RowKey, Map<String, Object>> rowsMap = loadRowKeysByQuery( key );
private static void removeAssociationFromBridgeTable(InfinispanRemoteDatastoreProvider provider, AssociationKey key) {
final String bridgeTable = cacheName( key );
final ProtoStreamMappingAdapter mapper = provider.getDataMapperForCache( bridgeTable );
Map<RowKey, Map<String, Object>> rowsMap = loadRowKeysByQuery( provider, key );
for ( RowKey rowKey : rowsMap.keySet() ) {
String[] columnNames = rowKey.getColumnNames();
Object[] columnValues = rowKey.getColumnValues();
for ( String keyColumn : key.getColumnNames() ) {
int indexOf = org.hibernate.ogm.util.impl.ArrayHelper.indexOf( columnNames, keyColumn );
if ( indexOf != -1 ) {
columnValues[indexOf] = null;
}
}
Tuple updatedPayload = new Tuple();
for ( int i = 0; i < columnNames.length; i++ ) {
updatedPayload.put( columnNames[i], columnValues[i] );
}
ProtostreamId idBuffer = mapper.createIdPayload( columnNames, columnValues );
ProtostreamPayload valuePayload = mapper.createValuePayload( updatedPayload );
mapper.withinCacheEncodingContext( c -> c.put( idBuffer, valuePayload ) );
mapper.withinCacheEncodingContext( c -> c.remove( idBuffer ) );
}
}

Expand Down

0 comments on commit 16bc018

Please sign in to comment.