From 866d11bf3ecf909682f9fbc91195323f26a1e2ad Mon Sep 17 00:00:00 2001 From: Michael Russo Date: Sun, 8 May 2016 15:45:28 +0800 Subject: [PATCH] Unique value serialization completely converted to use CQL. Complete Astyanax removal still needs to be completed. --- .../EntityCollectionManagerFactoryImpl.java | 9 +- .../impl/EntityCollectionManagerImpl.java | 21 +- .../mvcc/stage/delete/UniqueCleanup.java | 21 +- .../mvcc/stage/write/RollbackAction.java | 44 +- .../mvcc/stage/write/WriteUniqueVerify.java | 15 +- .../UniqueValueSerializationStrategy.java | 19 +- .../impl/AllUniqueFieldsIterator.java | 29 ++ .../serialization/impl/EntityVersion.java | 12 +- .../UniqueValueSerializationStrategyImpl.java | 384 ++++++------------ ...ueValueSerializationStrategyProxyImpl.java | 12 +- ...niqueValueSerializationStrategyV1Impl.java | 125 +++--- ...niqueValueSerializationStrategyV2Impl.java | 109 +++-- .../impl/UniqueValueSetImpl.java | 6 +- .../write/WriteOptimisticVerifyTest.java | 17 +- ...queValueSerializationStrategyImplTest.java | 5 +- 15 files changed, 383 insertions(+), 445 deletions(-) create mode 100644 stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/AllUniqueFieldsIterator.java diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java index a52ee9c00c..71e56f5eaa 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java @@ -23,6 +23,7 @@ import java.util.concurrent.ExecutionException; +import com.datastax.driver.core.Session; import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.collection.cache.EntityCacheFig; @@ -75,6 +76,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy; private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy; private final Keyspace keyspace; + private final Session session; private final MetricsFactory metricsFactory; private final RxTaskScheduler rxTaskScheduler; @@ -89,7 +91,7 @@ public EntityCollectionManager load( ApplicationScope scope ) { entitySerializationStrategy, uniqueValueSerializationStrategy, mvccLogEntrySerializationStrategy, keyspace, metricsFactory, serializationFig, - rxTaskScheduler, scope ); + rxTaskScheduler, scope, session ); return target; } @@ -107,7 +109,9 @@ public EntityCollectionManagerFactoryImpl( final WriteStart writeStart, final Wr final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy, final Keyspace keyspace, final EntityCacheFig entityCacheFig, - final MetricsFactory metricsFactory, @CollectionExecutorScheduler final RxTaskScheduler rxTaskScheduler ) { + final MetricsFactory metricsFactory, + @CollectionExecutorScheduler final RxTaskScheduler rxTaskScheduler, + final Session session ) { this.writeStart = writeStart; this.writeVerifyUnique = writeVerifyUnique; @@ -125,6 +129,7 @@ public EntityCollectionManagerFactoryImpl( final WriteStart writeStart, final Wr this.keyspace = keyspace; this.metricsFactory = metricsFactory; this.rxTaskScheduler = rxTaskScheduler; + this.session = session; } @Override public EntityCollectionManager createCollectionManager(ApplicationScope applicationScope) { diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java index e71e6bb730..6d42fa20e3 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java @@ -26,6 +26,8 @@ import java.util.List; import java.util.UUID; +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.Session; import com.netflix.astyanax.model.ConsistencyLevel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,7 +66,6 @@ import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.field.Field; -import org.apache.usergrid.persistence.model.util.EntityUtils; import org.apache.usergrid.persistence.model.util.UUIDGenerator; import com.codahale.metrics.Timer; @@ -72,7 +73,6 @@ import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; import com.netflix.astyanax.Keyspace; -import com.netflix.astyanax.MutationBatch; import com.netflix.astyanax.connectionpool.OperationResult; import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; import com.netflix.astyanax.model.ColumnFamily; @@ -81,7 +81,6 @@ import rx.Observable; import rx.Subscriber; -import rx.functions.Action0; /** @@ -114,6 +113,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { private final Keyspace keyspace; + private final Session session; private final Timer writeTimer; private final Timer deleteTimer; private final Timer fieldIdTimer; @@ -136,7 +136,8 @@ public EntityCollectionManagerImpl( final WriteStart writeStart, final WriteUniq final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy, final Keyspace keyspace, final MetricsFactory metricsFactory, final SerializationFig serializationFig, final RxTaskScheduler rxTaskScheduler, - @Assisted final ApplicationScope applicationScope ) { + @Assisted final ApplicationScope applicationScope, + final Session session) { this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy; this.entitySerializationStrategy = entitySerializationStrategy; this.uniqueCleanup = uniqueCleanup; @@ -157,6 +158,7 @@ public EntityCollectionManagerImpl( final WriteStart writeStart, final WriteUniq this.markCommit = markCommit; this.keyspace = keyspace; + this.session = session; this.applicationScope = applicationScope; @@ -347,8 +349,7 @@ public Observable
getEntitiesFromFields( final String type, final Coll //Load a entity for each entityId we retrieved. final EntitySet entitySet = entitySerializationStrategy.load( applicationScope, entityIds, startTime ); - //now loop through and ensure the entities are there. - final MutationBatch deleteBatch = keyspace.prepareMutationBatch(); + final BatchStatement uniqueDeleteBatch = new BatchStatement(); final MutableFieldSet response = new MutableFieldSet( fields1.size() ); @@ -357,9 +358,8 @@ public Observable
getEntitiesFromFields( final String type, final Coll //bad unique value, delete this, it's inconsistent if ( entity == null || !entity.getEntity().isPresent() ) { - final MutationBatch valueDelete = - uniqueValueSerializationStrategy.delete( applicationScope, expectedUnique ); - deleteBatch.mergeShallow( valueDelete ); + uniqueDeleteBatch.add( + uniqueValueSerializationStrategy.deleteCQL( applicationScope, expectedUnique )); continue; } @@ -371,8 +371,7 @@ public Observable
getEntitiesFromFields( final String type, final Coll } //TODO: explore making this an Async process - //We'll repair it again if we have to - deleteBatch.execute(); + session.execute(uniqueDeleteBatch); return response; } diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java index 8aa5cfc9ca..9f2b9942ea 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java @@ -26,6 +26,8 @@ import java.util.List; import java.util.UUID; +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.Session; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +65,7 @@ public class UniqueCleanup private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy; private final Keyspace keyspace; + private final Session session; private final SerializationFig serializationFig; @@ -70,12 +73,14 @@ public class UniqueCleanup @Inject public UniqueCleanup( final SerializationFig serializationFig, final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, - final Keyspace keyspace, final MetricsFactory metricsFactory ) { + final Keyspace keyspace, final MetricsFactory metricsFactory, + final Session session ) { this.serializationFig = serializationFig; this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy; this.keyspace = keyspace; this.uniqueCleanupTimer = metricsFactory.getTimer( UniqueCleanup.class, "uniquecleanup.base" ); + this.session = session; } @@ -127,22 +132,20 @@ protected Iterator getIterator() { //roll them up .doOnNext( uniqueValues -> { - final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch(); + + final BatchStatement uniqueCleanupBatch = new BatchStatement(); for ( UniqueValue value : uniqueValues ) { logger .debug( "Deleting value:{} from application scope: {} ", value, applicationScope ); uniqueCleanupBatch - .mergeShallow( uniqueValueSerializationStrategy.delete( applicationScope, value ) ); + .add( uniqueValueSerializationStrategy.deleteCQL( applicationScope, value ) ); } - try { - uniqueCleanupBatch.execute(); - } - catch ( ConnectionException e ) { - throw new RuntimeException( "Unable to execute batch mutation", e ); - } + + session.execute(uniqueCleanupBatch); + } ).lastOrDefault( Collections.emptyList() ).map( list -> mvccEntityCollectionIoEvent ); return ObservableTimer.time( uniqueValueCleanup, uniqueCleanupTimer ); diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java index 23c6dfe961..e5c4c9642b 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java @@ -18,6 +18,8 @@ package org.apache.usergrid.persistence.collection.mvcc.stage.write; +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.Session; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,14 +53,17 @@ public class RollbackAction implements Action1 { private final UniqueValueSerializationStrategy uniqueValueStrat; private final MvccLogEntrySerializationStrategy logEntryStrat; + private final Session session; @Inject - public RollbackAction(MvccLogEntrySerializationStrategy logEntryStrat, - UniqueValueSerializationStrategy uniqueValueStrat ) { + public RollbackAction( final MvccLogEntrySerializationStrategy logEntryStrat, + final UniqueValueSerializationStrategy uniqueValueStrat, + final Session session ) { this.uniqueValueStrat = uniqueValueStrat; this.logEntryStrat = logEntryStrat; + this.session = session; } @@ -72,6 +77,7 @@ public void call( final Throwable t ) { // one batch to handle rollback MutationBatch rollbackMb = null; + final BatchStatement uniqueDeleteBatch = new BatchStatement(); final Optional entity = mvccEntity.getEntity(); if ( entity.isPresent() ) { @@ -83,45 +89,17 @@ public void call( final Throwable t ) { UniqueValue toDelete = new UniqueValueImpl( field, entity.get().getId(), mvccEntity.getVersion() ); - MutationBatch deleteMb = uniqueValueStrat.delete(scope, toDelete ); + uniqueDeleteBatch.add(uniqueValueStrat.deleteCQL(scope, toDelete )); - if ( rollbackMb == null ) { - rollbackMb = deleteMb; - } - else { - rollbackMb.mergeShallow( deleteMb ); - } } } - - if ( rollbackMb != null ) { - try { - rollbackMb.execute(); - } - catch ( ConnectionException ex ) { - throw new RuntimeException( "Error rolling back changes", ex ); - } - } + // execute the batch statements for deleting unique field entries + session.execute(uniqueDeleteBatch); logEntryStrat.delete( scope, entity.get().getId(), mvccEntity.getVersion() ); } } } - - class FieldDeleteResult { - - private final String name; - - - public FieldDeleteResult( String name ) { - this.name = name; - } - - - public String getName() { - return this.name; - } - } } diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java index 8e0b20208f..501950ac93 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java @@ -25,6 +25,7 @@ import com.datastax.driver.core.BatchStatement; import com.datastax.driver.core.Session; +import com.netflix.hystrix.HystrixCommandProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +69,9 @@ public class WriteUniqueVerify implements Action1> private final UniqueValueSerializationStrategy uniqueValueStrat; - public static int uniqueVerifyPoolSize = 100; + private static int uniqueVerifyPoolSize = 100; + + private static int uniqueVerifyTimeoutMillis= 5000; protected final SerializationFig serializationFig; @@ -224,8 +227,10 @@ public Map executeStrategy(ConsistencyLevel consistencyLevel){ /** * Command group used for realtime user commands */ - public static final HystrixCommand.Setter - REPLAY_GROUP = HystrixCommand.Setter.withGroupKey( - HystrixCommandGroupKey.Factory.asKey( "uniqueVerify" ) ).andThreadPoolPropertiesDefaults( - HystrixThreadPoolProperties.Setter().withCoreSize( uniqueVerifyPoolSize ) ); + private static final HystrixCommand.Setter + REPLAY_GROUP = HystrixCommand.Setter.withGroupKey( HystrixCommandGroupKey.Factory.asKey( "uniqueVerify" ) ) + .andThreadPoolPropertiesDefaults( + HystrixThreadPoolProperties.Setter().withCoreSize( uniqueVerifyPoolSize ) ) + .andCommandPropertiesDefaults( + HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(uniqueVerifyTimeoutMillis)); } diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java index 56e8b87c73..bb6f5fe37c 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java @@ -41,23 +41,14 @@ public interface UniqueValueSerializationStrategy extends Migration, VersionedDa /** - * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds. - * - * @param applicationScope scope - * @param uniqueValue Object to be written - * - * @return MutatationBatch that encapsulates operation, caller may or may not execute. - */ - - /** - * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds. + * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds. -1 is the same as no ttl + * (lives forever) * * @param applicationScope scope * @param uniqueValue Object to be written * @param timeToLive How long object should live in seconds. -1 implies store forever - * @return MutatationBatch that encapsulates operation, caller may or may not execute. + * @return BatchStatement that encapsulates CQL statements, caller may or may not execute. */ - BatchStatement writeCQL(ApplicationScope applicationScope, UniqueValue uniqueValue, int timeToLive ); /** @@ -103,9 +94,9 @@ UniqueValueSet load( ApplicationScope applicationScope, ConsistencyLevel consist * * @param applicationScope The scope of the application * @param uniqueValue Object to be deleted. - * @return MutatationBatch that encapsulates operation, caller may or may not execute. + * @return BatchStatement that encapsulates the CQL statements, caller may or may not execute. */ - MutationBatch delete( ApplicationScope applicationScope, UniqueValue uniqueValue ); + BatchStatement deleteCQL( ApplicationScope applicationScope, UniqueValue uniqueValue); } diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/AllUniqueFieldsIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/AllUniqueFieldsIterator.java new file mode 100644 index 0000000000..ed210e99f3 --- /dev/null +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/AllUniqueFieldsIterator.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.collection.serialization.impl; + + +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import org.apache.usergrid.persistence.collection.serialization.UniqueValue; + +import java.util.Iterator; + + diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersion.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersion.java index 274cf5d03d..d451adc26a 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersion.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityVersion.java @@ -42,7 +42,8 @@ public UUID getEntityVersion() { return entityVersion; } - public boolean equals( Object o ) { + @Override + public boolean equals( final Object o ) { if ( o == null || !(o instanceof EntityVersion) ) { return false; @@ -60,5 +61,12 @@ public boolean equals( Object o ) { return true; } - + + @Override + public int hashCode() { + int result = entityId.hashCode(); + result = 31 * result + entityVersion.hashCode(); + return result; + } + } diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java index 27a86095ef..e0a9035a7c 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java @@ -22,11 +22,10 @@ import java.util.*; import com.datastax.driver.core.*; +import com.datastax.driver.core.Row; import com.datastax.driver.core.querybuilder.Clause; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.querybuilder.Using; -import com.netflix.astyanax.model.*; -import com.netflix.astyanax.util.RangeBuilder; import org.apache.usergrid.persistence.core.CassandraConfig; import org.apache.usergrid.persistence.core.datastax.TableDefinition; import org.apache.usergrid.persistence.model.entity.SimpleId; @@ -41,8 +40,6 @@ import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet; import org.apache.usergrid.persistence.core.CassandraFig; -import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator; -import org.apache.usergrid.persistence.core.astyanax.ColumnParser; import org.apache.usergrid.persistence.core.astyanax.ColumnTypes; import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily; import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition; @@ -56,7 +53,6 @@ import com.netflix.astyanax.Keyspace; import com.netflix.astyanax.MutationBatch; import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; -import com.netflix.astyanax.query.RowQuery; /** @@ -126,73 +122,6 @@ public UniqueValueSerializationStrategyImpl( final Keyspace keyspace, final Cass } - - public MutationBatch write( final ApplicationScope collectionScope, UniqueValue value ) { - - - Preconditions.checkNotNull( value, "value is required" ); - - - final Id entityId = value.getEntityId(); - final UUID entityVersion = value.getEntityVersion(); - final Field field = value.getField(); - - ValidationUtils.verifyIdentity( entityId ); - ValidationUtils.verifyVersion( entityVersion ); - - - final EntityVersion ev = new EntityVersion( entityId, entityVersion ); - final UniqueFieldEntry uniqueFieldEntry = new UniqueFieldEntry( entityVersion, field ); - - return doWrite( collectionScope, value, new RowOp() { - - @Override - public void doLookup( final ColumnListMutation colMutation ) { - colMutation.putColumn( ev, COL_VALUE ); - } - - - @Override - public void doLog( final ColumnListMutation colMutation ) { - colMutation.putColumn( uniqueFieldEntry, COL_VALUE ); - } - } ); - } - - - public MutationBatch write( final ApplicationScope collectionScope, final UniqueValue value, - final int timeToLive ) { - - Preconditions.checkNotNull( value, "value is required" ); - Preconditions.checkArgument( timeToLive > 0, "timeToLive must be greater than 0 is required" ); - - final Id entityId = value.getEntityId(); - final UUID entityVersion = value.getEntityVersion(); - final Field field = value.getField(); - - ValidationUtils.verifyIdentity( entityId ); - ValidationUtils.verifyVersion( entityVersion ); - - final EntityVersion ev = new EntityVersion( entityId, entityVersion ); - final UniqueFieldEntry uniqueFieldEntry = new UniqueFieldEntry( entityVersion, field ); - - return doWrite( collectionScope, value, new RowOp() { - - @Override - public void doLookup( final ColumnListMutation colMutation ) { - colMutation.putColumn( ev, COL_VALUE, timeToLive ); - } - - - //we purposefully leave out TTL. Worst case we issue deletes against tombstoned columns - //best case, we clean up an invalid secondary index entry when the log is used - @Override - public void doLog( final ColumnListMutation colMutation ) { - colMutation.putColumn( uniqueFieldEntry, COL_VALUE ); - } - } ); - } - @Override public BatchStatement writeCQL( final ApplicationScope collectionScope, final UniqueValue value, final int timeToLive ){ @@ -259,26 +188,15 @@ public BatchStatement writeCQL( final ApplicationScope collectionScope, final Un return batch; - /** - * @Override - public void doLookup( final ColumnListMutation colMutation ) { - colMutation.putColumn( ev, COL_VALUE ); - } - - - @Override - public void doLog( final ColumnListMutation colMutation ) { - colMutation.putColumn( uniqueFieldEntry, COL_VALUE ); - } - */ } @Override - public MutationBatch delete( final ApplicationScope scope, UniqueValue value ) { + public BatchStatement deleteCQL( final ApplicationScope scope, UniqueValue value){ Preconditions.checkNotNull( value, "value is required" ); + final BatchStatement batch = new BatchStatement(); final Id entityId = value.getEntityId(); final UUID entityVersion = value.getEntityVersion(); @@ -291,52 +209,31 @@ public MutationBatch delete( final ApplicationScope scope, UniqueValue value ) { final EntityVersion ev = new EntityVersion( entityId, entityVersion ); final UniqueFieldEntry uniqueFieldEntry = new UniqueFieldEntry( entityVersion, field ); - return doWrite( scope, value, new RowOp() { - - @Override - public void doLookup( final ColumnListMutation colMutation ) { - colMutation.deleteColumn( ev ); - } - - - @Override - public void doLog( final ColumnListMutation colMutation ) { - colMutation.deleteColumn( uniqueFieldEntry ); - } - } ); - } + ByteBuffer partitionKey = getPartitionKey( scope.getApplication(), value.getEntityId().getType(), + value.getField().getTypeName().toString(), value.getField().getName(), value.getField().getValue()); - /** - * Do the column update or delete for the given column and row key - * - * @param applicationScope We need to use this when getting the keyspace - * @param uniqueValue The unique value to write - * @param op The operation to write - */ - private MutationBatch doWrite( ApplicationScope applicationScope, UniqueValue uniqueValue, RowOp op ) { - final MutationBatch batch = keyspace.prepareMutationBatch(); + ByteBuffer columnValue = serializeUniqueValueColumn(ev); - final Id applicationId = applicationScope.getApplication(); + final Clause uniqueEqKey = QueryBuilder.eq("key", partitionKey ); + final Clause uniqueEqColumn = QueryBuilder.eq("column1", columnValue ); + Statement uniqueDelete = QueryBuilder.delete().from(TABLE_UNIQUE_VALUES).where(uniqueEqKey).and(uniqueEqColumn); + batch.add(uniqueDelete); - final FieldKey fieldKey = createUniqueValueKey( applicationId, uniqueValue.getEntityId().getType(), uniqueValue.getField() ); - op.doLookup( batch.withRow( CF_UNIQUE_VALUES, ScopedRowKey.fromKey( applicationId, fieldKey ) ) ); + ByteBuffer logPartitionKey = getLogPartitionKey(scope.getApplication(), entityId); + ByteBuffer logColumnValue = serializeUniqueValueLogColumn(uniqueFieldEntry); - final EntityKey entityKey = createEntityUniqueLogKey( applicationId, uniqueValue.getEntityId() ); + final Clause uniqueLogEqKey = QueryBuilder.eq("key", logPartitionKey ); + final Clause uniqueLogEqColumn = QueryBuilder.eq("column1", logColumnValue ); - op.doLog( batch.withRow( CF_ENTITY_UNIQUE_VALUE_LOG, - ScopedRowKey.fromKey( applicationId, entityKey ) ) ); + Statement uniqueLogDelete = QueryBuilder.delete() + .from(TABLE_UNIQUE_VALUES_LOG).where(uniqueLogEqKey).and( uniqueLogEqColumn); + batch.add(uniqueLogDelete); - if ( log.isTraceEnabled() ) { - log.trace( "Writing unique value version={} name={} value={} ", - uniqueValue.getEntityVersion(), uniqueValue.getField().getName(), - uniqueValue.getField().getValue() - ); - } return batch; @@ -364,59 +261,6 @@ public UniqueValueSet load( final ApplicationScope appScope, final com.netflix.a } - private UniqueValueSet loadLegacy(final ApplicationScope appScope, - final String type, final Collection fields) throws ConnectionException { - final List> keys = new ArrayList<>( fields.size() ); - - final Id applicationId = appScope.getApplication(); - - for ( Field field : fields ) { - - final FieldKey key = createUniqueValueKey( applicationId, type, field ); - - - final ScopedRowKey rowKey = - ScopedRowKey.fromKey( applicationId, key ); - - keys.add( rowKey ); - } - - final UniqueValueSetImpl uniqueValueSet = new UniqueValueSetImpl( fields.size() ); - - Iterator, EntityVersion>> results = - keyspace.prepareQuery( CF_UNIQUE_VALUES ).setConsistencyLevel(com.netflix.astyanax.model.ConsistencyLevel.CL_LOCAL_QUORUM ).getKeySlice( keys ) - .withColumnRange( new RangeBuilder().setLimit( 1 ).build() ).execute().getResult().iterator(); - - - while ( results.hasNext() ) - - { - - final com.netflix.astyanax.model.Row, EntityVersion> unique = results.next(); - - - final Field field = parseRowKey( unique.getKey() ); - - final Iterator> columnList = unique.getColumns().iterator(); - - //sanity check, nothing to do, skip it - if ( !columnList.hasNext() ) { - continue; - } - - final EntityVersion entityVersion = columnList.next().getName(); - - - final UniqueValueImpl uniqueValue = - new UniqueValueImpl( field, entityVersion.getEntityId(), entityVersion.getEntityVersion() ); - - uniqueValueSet.addValue( uniqueValue ); - } - - return uniqueValueSet; - - } - private UniqueValueSet loadCQL( final ApplicationScope appScope, final com.datastax.driver.core.ConsistencyLevel consistencyLevel, final String type, final Collection fields ) throws ConnectionException { @@ -460,7 +304,6 @@ private UniqueValueSet loadCQL( final ApplicationScope appScope, final com.datas List keyContents = deserializePartitionKey(partitionKey); List columnContents = deserializeUniqueValueColumn(column); - Field field = null; FieldTypeName fieldType; String name; String value; @@ -478,29 +321,8 @@ private UniqueValueSet loadCQL( final ApplicationScope appScope, final com.datas } - switch ( fieldType ) { - case BOOLEAN: - field = new BooleanField( name, Boolean.parseBoolean( value ) ); - break; - case DOUBLE: - field = new DoubleField( name, Double.parseDouble( value ) ); - break; - case FLOAT: - field = new FloatField( name, Float.parseFloat( value ) ); - break; - case INTEGER: - field = new IntegerField( name, Integer.parseInt( value ) ); - break; - case LONG: - field = new LongField( name, Long.parseLong( value ) ); - break; - case STRING: - field = new StringField( name, value ); - break; - case UUID: - field = new UUIDField( name, UUID.fromString( value ) ); - break; - } + Field field = getField(name, value, fieldType); + final EntityVersion entityVersion = new EntityVersion( new SimpleId((UUID)columnContents.get(1), (String)columnContents.get(2)), (UUID)columnContents.get(0)); @@ -526,59 +348,17 @@ public Iterator getAllUniqueFields( final ApplicationScope collecti Preconditions.checkNotNull( entityId, "entity id is required" ); - final Id applicationId = collectionScope.getApplication(); - - final EntityKey entityKey = createEntityUniqueLogKey( applicationId, entityId ); - - - final ScopedRowKey rowKey = - ScopedRowKey.fromKey( applicationId, entityKey ); - - - RowQuery, UniqueFieldEntry> query = - keyspace.prepareQuery( CF_ENTITY_UNIQUE_VALUE_LOG ).getKey( rowKey ) - .withColumnRange( ( UniqueFieldEntry ) null, null, false, serializationFig.getBufferSize() ); - - return new ColumnNameIterator( query, new UniqueEntryParser( entityId ), false ); - } + Clause inKey = QueryBuilder.in("key", getLogPartitionKey(collectionScope.getApplication(), entityId)); + Statement statement = QueryBuilder.select().all().from(TABLE_UNIQUE_VALUES_LOG) + .where(inKey); - /** - * Simple callback to perform puts and deletes with a common row setup code - */ - private interface RowOp { + return new AllUniqueFieldsIterator(session, statement, entityId); - /** - * Execute the mutation into the lookup CF_UNIQUE_VALUES row - */ - void doLookup( ColumnListMutation colMutation ); - /** - * Execute the mutation into the lCF_ENTITY_UNIQUE_VALUESLUE row - */ - void doLog( ColumnListMutation colMutation ); } - /** - * Converts raw columns to the expected output - */ - private static final class UniqueEntryParser implements ColumnParser { - - private final Id entityId; - - - private UniqueEntryParser( final Id entityId ) {this.entityId = entityId;} - - - @Override - public UniqueValue parseColumn( final Column column ) { - final UniqueFieldEntry entry = column.getName(); - - return new UniqueValueImpl( entry.getField(), entityId, entry.getVersion() ); - } - } - @Override public Collection getColumnFamilies() { @@ -621,27 +401,9 @@ public Collection getTables() { protected abstract TableDefinition getUniqueValuesTable(); - /** - * Generate a key that is compatible with the column family - * - * @param applicationId The applicationId - * @param type The type in the field - * @param field The field we're creating the key for - */ - protected abstract FieldKey createUniqueValueKey(final Id applicationId, final String type, final Field field ); - - /** - * Parse the row key into the field - * @param rowKey - * @return - */ - protected abstract Field parseRowKey(final ScopedRowKey rowKey); - - protected abstract List deserializePartitionKey(ByteBuffer bb); - - protected abstract Object serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry); + protected abstract ByteBuffer serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry); protected abstract ByteBuffer getPartitionKey(Id applicationId, String entityType, String fieldType, String fieldName, Object fieldValue ); @@ -651,6 +413,8 @@ public Collection getTables() { protected abstract List deserializeUniqueValueColumn(ByteBuffer bb); + protected abstract List deserializeUniqueValueLogColumn(ByteBuffer bb); + @@ -672,4 +436,100 @@ public Collection getTables() { * @param uniqueValueId The uniqueValue */ protected abstract EntityKey createEntityUniqueLogKey(final Id applicationId, final Id uniqueValueId ); + + + public class AllUniqueFieldsIterator implements Iterable, Iterator { + + private final Session session; + private final Statement query; + private final Id entityId; + + private Iterator sourceIterator; + + + + public AllUniqueFieldsIterator( final Session session, final Statement query, final Id entityId){ + + this.session = session; + this.query = query; + this.entityId = entityId; + + } + + + @Override + public Iterator iterator() { + return this; + } + + @Override + public boolean hasNext() { + + if ( sourceIterator == null ) { + + advanceIterator(); + + return sourceIterator.hasNext(); + } + + return sourceIterator.hasNext(); + } + + @Override + public UniqueValue next() { + + com.datastax.driver.core.Row next = sourceIterator.next(); + + ByteBuffer column = next.getBytesUnsafe("column1"); + + List columnContents = deserializeUniqueValueLogColumn(column); + + UUID version = (UUID) columnContents.get(0); + String name = (String) columnContents.get(1); + String value = (String) columnContents.get(2); + FieldTypeName fieldType = FieldTypeName.valueOf((String) columnContents.get(3)); + + + return new UniqueValueImpl(getField(name, value, fieldType), entityId, version); + + } + + private void advanceIterator() { + + sourceIterator = session.execute(query).iterator(); + } + } + + private Field getField( String name, String value, FieldTypeName fieldType){ + + Field field = null; + + switch ( fieldType ) { + case BOOLEAN: + field = new BooleanField( name, Boolean.parseBoolean( value ) ); + break; + case DOUBLE: + field = new DoubleField( name, Double.parseDouble( value ) ); + break; + case FLOAT: + field = new FloatField( name, Float.parseFloat( value ) ); + break; + case INTEGER: + field = new IntegerField( name, Integer.parseInt( value ) ); + break; + case LONG: + field = new LongField( name, Long.parseLong( value ) ); + break; + case STRING: + field = new StringField( name, value ); + break; + case UUID: + field = new UUIDField( name, UUID.fromString( value ) ); + break; + } + + return field; + + } + } diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java index bbfaa2dd37..dc5b48f11d 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java @@ -125,19 +125,19 @@ public Iterator getAllUniqueFields( final ApplicationScope applicat @Override - public MutationBatch delete( final ApplicationScope applicationScope, final UniqueValue uniqueValue ) { + public BatchStatement deleteCQL( final ApplicationScope applicationScope, final UniqueValue uniqueValue ) { final MigrationRelationship migration = getMigrationRelationShip(); if ( migration.needsMigration() ) { - final MutationBatch aggregateBatch = keyspace.prepareMutationBatch(); + final BatchStatement batch = new BatchStatement(); - aggregateBatch.mergeShallow( migration.from.delete( applicationScope, uniqueValue ) ); - aggregateBatch.mergeShallow( migration.to.delete( applicationScope, uniqueValue ) ); + batch.add(migration.from.deleteCQL( applicationScope, uniqueValue ) ); + batch.add(migration.to.deleteCQL( applicationScope, uniqueValue ) ); - return aggregateBatch; + return batch; } - return migration.to.delete( applicationScope, uniqueValue ); + return migration.to.deleteCQL( applicationScope, uniqueValue ); } diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java index 75666faafb..cbd8a3e8b0 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java @@ -175,26 +175,6 @@ protected TableDefinition getEntityUniqueLogTable(){ } - @Override - protected CollectionPrefixedKey createUniqueValueKey( final Id applicationId, - final String type, final Field field) { - - - final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( type ); - - - final CollectionPrefixedKey uniquePrefixedKey = - new CollectionPrefixedKey<>( collectionName, applicationId, field ); - - return uniquePrefixedKey; - } - - - @Override - protected Field parseRowKey( final ScopedRowKey> rowKey ) { - return rowKey.getKey().getSubKey(); - } - @Override protected List deserializePartitionKey(ByteBuffer bb){ @@ -230,23 +210,23 @@ protected List deserializePartitionKey(ByteBuffer bb){ } @Override - protected Object serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry){ + protected ByteBuffer serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry){ /** - * final UUID version = value.getVersion(); - final Field field = value.getField(); + * final UUID version = value.getVersion(); + final Field field = value.getField(); - final FieldTypeName fieldType = field.getTypeName(); - final String fieldValue = field.getValue().toString().toLowerCase(); + final FieldTypeName fieldType = field.getTypeName(); + final String fieldValue = field.getValue().toString().toLowerCase(); - DynamicComposite composite = new DynamicComposite( ); + DynamicComposite composite = new DynamicComposite( ); - //we want to sort ascending to descending by version - composite.addComponent( version, UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED); - composite.addComponent( field.getName(), STRING_SERIALIZER ); - composite.addComponent( fieldValue, STRING_SERIALIZER ); - composite.addComponent( fieldType.name() , STRING_SERIALIZER); + //we want to sort ascending to descending by version + composite.addComponent( version, UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED); + composite.addComponent( field.getName(), STRING_SERIALIZER ); + composite.addComponent( fieldValue, STRING_SERIALIZER ); + composite.addComponent( fieldType.name() , STRING_SERIALIZER); */ // values are serialized as strings, not sure why, and always lower cased @@ -337,15 +317,15 @@ protected ByteBuffer getLogPartitionKey(final Id applicationId, final Id uniqueV protected ByteBuffer serializeUniqueValueColumn(EntityVersion entityVersion){ /** - * final Id entityId = ev.getEntityId(); - final UUID entityUuid = entityId.getUuid(); - final String entityType = entityId.getType(); + * final Id entityId = ev.getEntityId(); + final UUID entityUuid = entityId.getUuid(); + final String entityType = entityId.getType(); - CompositeBuilder builder = Composites.newDynamicCompositeBuilder(); + CompositeBuilder builder = Composites.newDynamicCompositeBuilder(); - builder.addUUID( entityVersion ); - builder.addUUID( entityUuid ); - builder.addString(entityType ); + builder.addUUID( entityVersion ); + builder.addUUID( entityUuid ); + builder.addString(entityType ); */ String comparator = "UTF8Type"; @@ -418,7 +398,49 @@ protected List deserializeUniqueValueColumn(ByteBuffer bb){ }else if(count ==1){ stuff.add(new UUID(data.getLong(), data.getLong())); }else{ - stuff.add(DataType.text().deserialize(data.duplicate(), ProtocolVersion.NEWEST_SUPPORTED)); + stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED)); + } + + byte equality = bb.get(); // we don't use this but take the equality byte off the buffer + + count++; + } + + return stuff; + + } + + @Override + protected List deserializeUniqueValueLogColumn(ByteBuffer bb){ + + + /** + * List keys = new ArrayList<>(4); + keys.add(fieldEntry.getVersion()); + keys.add(fieldEntry.getField().getName()); + keys.add(fieldValueString); + keys.add(fieldEntry.getField().getTypeName().name()); + */ + + List stuff = new ArrayList<>(); + int count = 0; + while(bb.hasRemaining()){ + + // the comparator info is different for the UUID reversed type vs. UTF8 type + if(count ==0){ + bb.getShort(); // take the reversed comparator byte off + }else { + ByteBuffer comparator = CQLUtils.getWithShortLength(bb); + } + + ByteBuffer data = CQLUtils.getWithShortLength(bb); + + + // first composite is a UUID, rest are strings + if(count == 0) { + stuff.add(new UUID(data.getLong(), data.getLong())); + }else{ + stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED)); } byte equality = bb.get(); // we don't use this but take the equality byte off the buffer @@ -465,15 +487,15 @@ private ByteBuffer serializeKey( UUID appUUID, final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( entityType ); -// final CollectionPrefixedKey uniquePrefixedKey = -// new CollectionPrefixedKey<>( collectionName, applicationId, field ); - -// //read back the id -// final Id orgId = ID_SER.fromComposite( parser ); -// final Id scopeId = ID_SER.fromComposite( parser ); -// final String scopeName = parser.readString(); -// final K value = keySerializer.fromComposite( parser ); + /** + final CollectionPrefixedKey uniquePrefixedKey = + new CollectionPrefixedKey<>( collectionName, applicationId, field ); + final Id orgId = ID_SER.fromComposite( parser ); + final Id scopeId = ID_SER.fromComposite( parser ); + final String scopeName = parser.readString(); + final K value = keySerializer.fromComposite( parser ); + **/ // values are serialized as strings, not sure why, and always lower cased String fieldValueString = fieldValue.toString().toLowerCase(); @@ -521,10 +543,11 @@ private ByteBuffer serializeLogKey(UUID appUUID, String applicationType, UUID en final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( entityType ); -// -// -// final CollectionPrefixedKey collectionPrefixedEntityKey = -// new CollectionPrefixedKey<>( collectionName, applicationId, uniqueValueId ); + + /** + final CollectionPrefixedKey collectionPrefixedEntityKey = + new CollectionPrefixedKey<>( collectionName, applicationId, uniqueValueId ); + **/ List keys = new ArrayList<>(4); keys.add(appUUID); diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java index 4177c37d90..3e4932aaf5 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java @@ -40,7 +40,6 @@ import org.apache.usergrid.persistence.core.datastax.CQLUtils; import org.apache.usergrid.persistence.core.datastax.TableDefinition; import org.apache.usergrid.persistence.model.entity.Id; -import org.apache.usergrid.persistence.model.field.Field; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -171,30 +170,18 @@ protected TableDefinition getEntityUniqueLogTable(){ } - - @Override - protected TypeField createUniqueValueKey( final Id applicationId, final String type, final Field field) { - return new TypeField(type,field); - } - - - @Override - protected Field parseRowKey( final ScopedRowKey rowKey ) { - return rowKey.getKey().getField(); - } - @Override protected List deserializePartitionKey(ByteBuffer bb){ /** - * List keys = new ArrayList<>(6); - keys.add(0, appUUID); // UUID - keys.add(1, applicationType); // String - keys.add(2, entityType); // String - keys.add(3, fieldType); // String - keys.add(4, fieldName); // String - keys.add(5, fieldValueString); // String + * List keys = new ArrayList<>(6); + keys.add(0, appUUID); // UUID + keys.add(1, applicationType); // String + keys.add(2, entityType); // String + keys.add(3, fieldType); // String + keys.add(4, fieldName); // String + keys.add(5, fieldValueString); // String */ @@ -215,23 +202,23 @@ protected List deserializePartitionKey(ByteBuffer bb){ } @Override - protected Object serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry){ + protected ByteBuffer serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry){ /** - * final UUID version = value.getVersion(); - final Field field = value.getField(); + * final UUID version = value.getVersion(); + final Field field = value.getField(); - final FieldTypeName fieldType = field.getTypeName(); - final String fieldValue = field.getValue().toString().toLowerCase(); + final FieldTypeName fieldType = field.getTypeName(); + final String fieldValue = field.getValue().toString().toLowerCase(); - DynamicComposite composite = new DynamicComposite( ); + DynamicComposite composite = new DynamicComposite( ); - //we want to sort ascending to descending by version - composite.addComponent( version, UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED); - composite.addComponent( field.getName(), STRING_SERIALIZER ); - composite.addComponent( fieldValue, STRING_SERIALIZER ); - composite.addComponent( fieldType.name() , STRING_SERIALIZER); + //we want to sort ascending to descending by version + composite.addComponent( version, UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED); + composite.addComponent( field.getName(), STRING_SERIALIZER ); + composite.addComponent( fieldValue, STRING_SERIALIZER ); + composite.addComponent( fieldType.name() , STRING_SERIALIZER); */ // values are serialized as strings, not sure why, and always lower cased @@ -250,7 +237,7 @@ protected Object serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry){ fieldEntry.getField().getTypeName().name().length(); // we always need to add length for the 2 byte comparator short, 2 byte length short and 1 byte equality - size += keys.size()*65; + size += keys.size()*5; // uuid type comparator is longest, ensure we allocate buffer using the max size to avoid overflow size += keys.size()*comparator.length(); @@ -322,15 +309,15 @@ protected ByteBuffer getLogPartitionKey(final Id applicationId, final Id uniqueV protected ByteBuffer serializeUniqueValueColumn(EntityVersion entityVersion){ /** - * final Id entityId = ev.getEntityId(); - final UUID entityUuid = entityId.getUuid(); - final String entityType = entityId.getType(); + * final Id entityId = ev.getEntityId(); + final UUID entityUuid = entityId.getUuid(); + final String entityType = entityId.getType(); - CompositeBuilder builder = Composites.newDynamicCompositeBuilder(); + CompositeBuilder builder = Composites.newDynamicCompositeBuilder(); - builder.addUUID( entityVersion ); - builder.addUUID( entityUuid ); - builder.addString(entityType ); + builder.addUUID( entityVersion ); + builder.addUUID( entityUuid ); + builder.addString(entityType ); */ String comparator = "UTF8Type"; @@ -403,7 +390,49 @@ protected List deserializeUniqueValueColumn(ByteBuffer bb){ }else if(count ==1){ stuff.add(new UUID(data.getLong(), data.getLong())); }else{ - stuff.add(DataType.text().deserialize(data.duplicate(), ProtocolVersion.NEWEST_SUPPORTED)); + stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED)); + } + + byte equality = bb.get(); // we don't use this but take the equality byte off the buffer + + count++; + } + + return stuff; + + } + + @Override + protected List deserializeUniqueValueLogColumn(ByteBuffer bb){ + + + /** + * List keys = new ArrayList<>(4); + keys.add(fieldEntry.getVersion()); + keys.add(fieldEntry.getField().getName()); + keys.add(fieldValueString); + keys.add(fieldEntry.getField().getTypeName().name()); + */ + + List stuff = new ArrayList<>(); + int count = 0; + while(bb.hasRemaining()){ + + // the comparator info is different for the UUID reversed type vs. UTF8 type + if(count ==0){ + bb.getShort(); // take the reversed comparator byte off + }else { + ByteBuffer comparator = CQLUtils.getWithShortLength(bb); + } + + ByteBuffer data = CQLUtils.getWithShortLength(bb); + + + // first composite is a UUID, rest are strings + if(count == 0) { + stuff.add(new UUID(data.getLong(), data.getLong())); + }else{ + stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED)); } byte equality = bb.get(); // we don't use this but take the equality byte off the buffer diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSetImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSetImpl.java index 8dd9528bc2..853913b4c6 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSetImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSetImpl.java @@ -37,7 +37,11 @@ public UniqueValueSetImpl(final int expectedMaxSize) { public void addValue(UniqueValue value){ - values.put( value.getField().getName(), value ); + values.putIfAbsent( value.getField().getName(), value ); + // ^^ putIfAbsent important here as CQL returns column values differently than Asytanax/thrift due to CQL not + // having a 'column range' for each row slice and all columns are returned. We don't want to overwrite the + // first column values retrieved + } @Override diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java index 6a705e4e1d..148cc0955a 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteOptimisticVerifyTest.java @@ -21,7 +21,12 @@ import java.util.ArrayList; import java.util.List; +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.Session; +import com.google.inject.Inject; +import org.apache.usergrid.persistence.core.test.ITRunner; import org.junit.Test; +import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,8 +58,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; - - @UseModules( TestCollectionModule.class ) public class WriteOptimisticVerifyTest extends AbstractMvccEntityStageTest { @@ -110,6 +113,7 @@ public void testConflict() throws Exception { when( scope.getApplication() ) .thenReturn( new SimpleId( UUIDGenerator.newTimeUUID(), "organization" ) ); + final Session session = mock(Session.class); // there is an entity final Entity entity = generateEntity(); @@ -135,16 +139,13 @@ public void testConflict() throws Exception { UniqueValueSerializationStrategy uvstrat = mock( UniqueValueSerializationStrategy.class); UniqueValue uv1 = new UniqueValueImpl(entity.getField("name"), entity.getId(), entity.getVersion()); UniqueValue uv2 = new UniqueValueImpl( entity.getField("identifier"), entity.getId(), entity.getVersion()); - MutationBatch mb = mock( MutationBatch.class ); - when( uvstrat.delete(scope, uv1) ).thenReturn(mb); - when( uvstrat.delete(scope, uv2) ).thenReturn(mb); // Run the stage, conflict should be detected final MvccEntity mvccEntity = fromEntity( entity ); boolean conflictDetected = false; WriteOptimisticVerify newStage = new WriteOptimisticVerify( mvccLog ); - RollbackAction rollbackAction = new RollbackAction( mvccLog, uvstrat ); + RollbackAction rollbackAction = new RollbackAction( mvccLog, uvstrat, session ); try { newStage.call( new CollectionIoEvent<>(scope, mvccEntity)); @@ -157,8 +158,8 @@ public void testConflict() throws Exception { assertTrue( conflictDetected ); // check that unique values were deleted - verify( uvstrat, times(1) ).delete(scope, uv1 ); - verify( uvstrat, times(1) ).delete(scope, uv2 ); + verify( uvstrat, times(1) ).deleteCQL(scope, uv1 ); + verify( uvstrat, times(1) ).deleteCQL(scope, uv2 ); } } diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java index 3ffdb6581e..185cfb74d9 100644 --- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java +++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java @@ -192,7 +192,10 @@ public void testDelete() throws ConnectionException { BatchStatement batch = strategy.writeCQL( scope, stored, -1); session.execute(batch); - strategy.delete( scope, stored ).execute(); + + //strategy.delete( scope, stored ).execute(); + BatchStatement deleteBatch = strategy.deleteCQL(scope, stored); + session.execute(deleteBatch); UniqueValueSet fields = strategy.load( scope, entityId.getType(), Collections.singleton( field ) );