Skip to content
This repository has been archived by the owner on Jan 5, 2022. It is now read-only.

Commit

Permalink
Unique value serialization completely converted to use CQL. Complete …
Browse files Browse the repository at this point in the history
…Astyanax removal still needs to be completed.
  • Loading branch information
Michael Russo committed May 8, 2016
1 parent 0c60987 commit 866d11b
Show file tree
Hide file tree
Showing 15 changed files with 383 additions and 445 deletions.
Expand Up @@ -23,6 +23,7 @@


import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;


import com.datastax.driver.core.Session;
import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.collection.cache.EntityCacheFig; import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
Expand Down Expand Up @@ -75,6 +76,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy; private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy; private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
private final Keyspace keyspace; private final Keyspace keyspace;
private final Session session;
private final MetricsFactory metricsFactory; private final MetricsFactory metricsFactory;
private final RxTaskScheduler rxTaskScheduler; private final RxTaskScheduler rxTaskScheduler;


Expand All @@ -89,7 +91,7 @@ public EntityCollectionManager load( ApplicationScope scope ) {
entitySerializationStrategy, uniqueValueSerializationStrategy, entitySerializationStrategy, uniqueValueSerializationStrategy,
mvccLogEntrySerializationStrategy, keyspace, mvccLogEntrySerializationStrategy, keyspace,
metricsFactory, serializationFig, metricsFactory, serializationFig,
rxTaskScheduler, scope ); rxTaskScheduler, scope, session );


return target; return target;
} }
Expand All @@ -107,7 +109,9 @@ public EntityCollectionManagerFactoryImpl( final WriteStart writeStart, final Wr
final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy, final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
final Keyspace keyspace, final EntityCacheFig entityCacheFig, final Keyspace keyspace, final EntityCacheFig entityCacheFig,
final MetricsFactory metricsFactory, @CollectionExecutorScheduler final RxTaskScheduler rxTaskScheduler ) { final MetricsFactory metricsFactory,
@CollectionExecutorScheduler final RxTaskScheduler rxTaskScheduler,
final Session session ) {


this.writeStart = writeStart; this.writeStart = writeStart;
this.writeVerifyUnique = writeVerifyUnique; this.writeVerifyUnique = writeVerifyUnique;
Expand All @@ -125,6 +129,7 @@ public EntityCollectionManagerFactoryImpl( final WriteStart writeStart, final Wr
this.keyspace = keyspace; this.keyspace = keyspace;
this.metricsFactory = metricsFactory; this.metricsFactory = metricsFactory;
this.rxTaskScheduler = rxTaskScheduler; this.rxTaskScheduler = rxTaskScheduler;
this.session = session;
} }
@Override @Override
public EntityCollectionManager createCollectionManager(ApplicationScope applicationScope) { public EntityCollectionManager createCollectionManager(ApplicationScope applicationScope) {
Expand Down
Expand Up @@ -26,6 +26,8 @@
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;


import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.Session;
import com.netflix.astyanax.model.ConsistencyLevel; import com.netflix.astyanax.model.ConsistencyLevel;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -64,15 +66,13 @@
import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.field.Field; import org.apache.usergrid.persistence.model.field.Field;
import org.apache.usergrid.persistence.model.util.EntityUtils;
import org.apache.usergrid.persistence.model.util.UUIDGenerator; import org.apache.usergrid.persistence.model.util.UUIDGenerator;


import com.codahale.metrics.Timer; import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted; import com.google.inject.assistedinject.Assisted;
import com.netflix.astyanax.Keyspace; import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.OperationResult; import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.model.ColumnFamily; import com.netflix.astyanax.model.ColumnFamily;
Expand All @@ -81,7 +81,6 @@


import rx.Observable; import rx.Observable;
import rx.Subscriber; import rx.Subscriber;
import rx.functions.Action0;




/** /**
Expand Down Expand Up @@ -114,6 +113,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {




private final Keyspace keyspace; private final Keyspace keyspace;
private final Session session;
private final Timer writeTimer; private final Timer writeTimer;
private final Timer deleteTimer; private final Timer deleteTimer;
private final Timer fieldIdTimer; private final Timer fieldIdTimer;
Expand All @@ -136,7 +136,8 @@ public EntityCollectionManagerImpl( final WriteStart writeStart, final WriteUniq
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy, final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
final Keyspace keyspace, final MetricsFactory metricsFactory, final Keyspace keyspace, final MetricsFactory metricsFactory,
final SerializationFig serializationFig, final RxTaskScheduler rxTaskScheduler, final SerializationFig serializationFig, final RxTaskScheduler rxTaskScheduler,
@Assisted final ApplicationScope applicationScope ) { @Assisted final ApplicationScope applicationScope,
final Session session) {
this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy; this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
this.entitySerializationStrategy = entitySerializationStrategy; this.entitySerializationStrategy = entitySerializationStrategy;
this.uniqueCleanup = uniqueCleanup; this.uniqueCleanup = uniqueCleanup;
Expand All @@ -157,6 +158,7 @@ public EntityCollectionManagerImpl( final WriteStart writeStart, final WriteUniq
this.markCommit = markCommit; this.markCommit = markCommit;


this.keyspace = keyspace; this.keyspace = keyspace;
this.session = session;




this.applicationScope = applicationScope; this.applicationScope = applicationScope;
Expand Down Expand Up @@ -347,8 +349,7 @@ public Observable<FieldSet> getEntitiesFromFields( final String type, final Coll
//Load a entity for each entityId we retrieved. //Load a entity for each entityId we retrieved.
final EntitySet entitySet = entitySerializationStrategy.load( applicationScope, entityIds, startTime ); final EntitySet entitySet = entitySerializationStrategy.load( applicationScope, entityIds, startTime );


//now loop through and ensure the entities are there. final BatchStatement uniqueDeleteBatch = new BatchStatement();
final MutationBatch deleteBatch = keyspace.prepareMutationBatch();


final MutableFieldSet response = new MutableFieldSet( fields1.size() ); final MutableFieldSet response = new MutableFieldSet( fields1.size() );


Expand All @@ -357,9 +358,8 @@ public Observable<FieldSet> getEntitiesFromFields( final String type, final Coll


//bad unique value, delete this, it's inconsistent //bad unique value, delete this, it's inconsistent
if ( entity == null || !entity.getEntity().isPresent() ) { if ( entity == null || !entity.getEntity().isPresent() ) {
final MutationBatch valueDelete = uniqueDeleteBatch.add(
uniqueValueSerializationStrategy.delete( applicationScope, expectedUnique ); uniqueValueSerializationStrategy.deleteCQL( applicationScope, expectedUnique ));
deleteBatch.mergeShallow( valueDelete );
continue; continue;
} }


Expand All @@ -371,8 +371,7 @@ public Observable<FieldSet> getEntitiesFromFields( final String type, final Coll
} }


//TODO: explore making this an Async process //TODO: explore making this an Async process
//We'll repair it again if we have to session.execute(uniqueDeleteBatch);
deleteBatch.execute();


return response; return response;
} }
Expand Down
Expand Up @@ -26,6 +26,8 @@
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;


import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.Session;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down Expand Up @@ -63,19 +65,22 @@ public class UniqueCleanup


private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy; private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
private final Keyspace keyspace; private final Keyspace keyspace;
private final Session session;


private final SerializationFig serializationFig; private final SerializationFig serializationFig;




@Inject @Inject
public UniqueCleanup( final SerializationFig serializationFig, public UniqueCleanup( final SerializationFig serializationFig,
final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
final Keyspace keyspace, final MetricsFactory metricsFactory ) { final Keyspace keyspace, final MetricsFactory metricsFactory,
final Session session ) {


this.serializationFig = serializationFig; this.serializationFig = serializationFig;
this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy; this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
this.keyspace = keyspace; this.keyspace = keyspace;
this.uniqueCleanupTimer = metricsFactory.getTimer( UniqueCleanup.class, "uniquecleanup.base" ); this.uniqueCleanupTimer = metricsFactory.getTimer( UniqueCleanup.class, "uniquecleanup.base" );
this.session = session;
} }




Expand Down Expand Up @@ -127,22 +132,20 @@ protected Iterator<UniqueValue> getIterator() {
//roll them up //roll them up


.doOnNext( uniqueValues -> { .doOnNext( uniqueValues -> {
final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
final BatchStatement uniqueCleanupBatch = new BatchStatement();




for ( UniqueValue value : uniqueValues ) { for ( UniqueValue value : uniqueValues ) {
logger logger
.debug( "Deleting value:{} from application scope: {} ", value, applicationScope ); .debug( "Deleting value:{} from application scope: {} ", value, applicationScope );
uniqueCleanupBatch uniqueCleanupBatch
.mergeShallow( uniqueValueSerializationStrategy.delete( applicationScope, value ) ); .add( uniqueValueSerializationStrategy.deleteCQL( applicationScope, value ) );
} }


try {
uniqueCleanupBatch.execute(); session.execute(uniqueCleanupBatch);
}
catch ( ConnectionException e ) {
throw new RuntimeException( "Unable to execute batch mutation", e );
}
} ).lastOrDefault( Collections.emptyList() ).map( list -> mvccEntityCollectionIoEvent ); } ).lastOrDefault( Collections.emptyList() ).map( list -> mvccEntityCollectionIoEvent );


return ObservableTimer.time( uniqueValueCleanup, uniqueCleanupTimer ); return ObservableTimer.time( uniqueValueCleanup, uniqueCleanupTimer );
Expand Down
Expand Up @@ -18,6 +18,8 @@
package org.apache.usergrid.persistence.collection.mvcc.stage.write; package org.apache.usergrid.persistence.collection.mvcc.stage.write;




import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.Session;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down Expand Up @@ -51,14 +53,17 @@ public class RollbackAction implements Action1<Throwable> {


private final UniqueValueSerializationStrategy uniqueValueStrat; private final UniqueValueSerializationStrategy uniqueValueStrat;
private final MvccLogEntrySerializationStrategy logEntryStrat; private final MvccLogEntrySerializationStrategy logEntryStrat;
private final Session session;




@Inject @Inject
public RollbackAction(MvccLogEntrySerializationStrategy logEntryStrat, public RollbackAction( final MvccLogEntrySerializationStrategy logEntryStrat,
UniqueValueSerializationStrategy uniqueValueStrat ) { final UniqueValueSerializationStrategy uniqueValueStrat,
final Session session ) {


this.uniqueValueStrat = uniqueValueStrat; this.uniqueValueStrat = uniqueValueStrat;
this.logEntryStrat = logEntryStrat; this.logEntryStrat = logEntryStrat;
this.session = session;
} }




Expand All @@ -72,6 +77,7 @@ public void call( final Throwable t ) {


// one batch to handle rollback // one batch to handle rollback
MutationBatch rollbackMb = null; MutationBatch rollbackMb = null;
final BatchStatement uniqueDeleteBatch = new BatchStatement();
final Optional<Entity> entity = mvccEntity.getEntity(); final Optional<Entity> entity = mvccEntity.getEntity();


if ( entity.isPresent() ) { if ( entity.isPresent() ) {
Expand All @@ -83,45 +89,17 @@ public void call( final Throwable t ) {
UniqueValue toDelete = UniqueValue toDelete =
new UniqueValueImpl( field, entity.get().getId(), mvccEntity.getVersion() ); new UniqueValueImpl( field, entity.get().getId(), mvccEntity.getVersion() );


MutationBatch deleteMb = uniqueValueStrat.delete(scope, toDelete ); uniqueDeleteBatch.add(uniqueValueStrat.deleteCQL(scope, toDelete ));


if ( rollbackMb == null ) {
rollbackMb = deleteMb;
}
else {
rollbackMb.mergeShallow( deleteMb );
}
} }
} }



// execute the batch statements for deleting unique field entries
if ( rollbackMb != null ) { session.execute(uniqueDeleteBatch);
try {
rollbackMb.execute();
}
catch ( ConnectionException ex ) {
throw new RuntimeException( "Error rolling back changes", ex );
}
}


logEntryStrat.delete( scope, entity.get().getId(), mvccEntity.getVersion() ); logEntryStrat.delete( scope, entity.get().getId(), mvccEntity.getVersion() );
} }
} }
} }



class FieldDeleteResult {

private final String name;


public FieldDeleteResult( String name ) {
this.name = name;
}


public String getName() {
return this.name;
}
}
} }
Expand Up @@ -25,6 +25,7 @@


import com.datastax.driver.core.BatchStatement; import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.Session; import com.datastax.driver.core.Session;
import com.netflix.hystrix.HystrixCommandProperties;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down Expand Up @@ -68,7 +69,9 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>


private final UniqueValueSerializationStrategy uniqueValueStrat; private final UniqueValueSerializationStrategy uniqueValueStrat;


public static int uniqueVerifyPoolSize = 100; private static int uniqueVerifyPoolSize = 100;

private static int uniqueVerifyTimeoutMillis= 5000;


protected final SerializationFig serializationFig; protected final SerializationFig serializationFig;


Expand Down Expand Up @@ -224,8 +227,10 @@ public Map<String, Field> executeStrategy(ConsistencyLevel consistencyLevel){
/** /**
* Command group used for realtime user commands * Command group used for realtime user commands
*/ */
public static final HystrixCommand.Setter private static final HystrixCommand.Setter
REPLAY_GROUP = HystrixCommand.Setter.withGroupKey( REPLAY_GROUP = HystrixCommand.Setter.withGroupKey( HystrixCommandGroupKey.Factory.asKey( "uniqueVerify" ) )
HystrixCommandGroupKey.Factory.asKey( "uniqueVerify" ) ).andThreadPoolPropertiesDefaults( .andThreadPoolPropertiesDefaults(
HystrixThreadPoolProperties.Setter().withCoreSize( uniqueVerifyPoolSize ) ); HystrixThreadPoolProperties.Setter().withCoreSize( uniqueVerifyPoolSize ) )
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(uniqueVerifyTimeoutMillis));
} }
Expand Up @@ -41,23 +41,14 @@ public interface UniqueValueSerializationStrategy extends Migration, VersionedDa




/** /**
* Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds. * Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds. -1 is the same as no ttl
* * (lives forever)
* @param applicationScope scope
* @param uniqueValue Object to be written
*
* @return MutatationBatch that encapsulates operation, caller may or may not execute.
*/

/**
* Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds.
* *
* @param applicationScope scope * @param applicationScope scope
* @param uniqueValue Object to be written * @param uniqueValue Object to be written
* @param timeToLive How long object should live in seconds. -1 implies store forever * @param timeToLive How long object should live in seconds. -1 implies store forever
* @return MutatationBatch that encapsulates operation, caller may or may not execute. * @return BatchStatement that encapsulates CQL statements, caller may or may not execute.
*/ */

BatchStatement writeCQL(ApplicationScope applicationScope, UniqueValue uniqueValue, int timeToLive ); BatchStatement writeCQL(ApplicationScope applicationScope, UniqueValue uniqueValue, int timeToLive );


/** /**
Expand Down Expand Up @@ -103,9 +94,9 @@ UniqueValueSet load( ApplicationScope applicationScope, ConsistencyLevel consist
* *
* @param applicationScope The scope of the application * @param applicationScope The scope of the application
* @param uniqueValue Object to be deleted. * @param uniqueValue Object to be deleted.
* @return MutatationBatch that encapsulates operation, caller may or may not execute. * @return BatchStatement that encapsulates the CQL statements, caller may or may not execute.
*/ */
MutationBatch delete( ApplicationScope applicationScope, UniqueValue uniqueValue ); BatchStatement deleteCQL( ApplicationScope applicationScope, UniqueValue uniqueValue);




} }
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.usergrid.persistence.collection.serialization.impl;


import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import org.apache.usergrid.persistence.collection.serialization.UniqueValue;

import java.util.Iterator;


0 comments on commit 866d11b

Please sign in to comment.