Skip to content
This repository has been archived by the owner on Jan 5, 2022. It is now read-only.

Commit

Permalink
add replay strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
Shawn Feldman committed Mar 13, 2015
1 parent 4414838 commit 117c0f3
Showing 1 changed file with 42 additions and 52 deletions.
Expand Up @@ -101,9 +101,46 @@ public void call( final CollectionIoEvent<MvccEntity> ioevent ) {

final CollectionScope scope = ioevent.getEntityCollection();

final MutationBatch batch = keyspace.prepareMutationBatch();
//allocate our max size, worst case
final List<Field> uniqueFields = new ArrayList<>( entity.getFields().size() );

//
// Construct all the functions for verifying we're unique
//
for ( final Field field : entity.getFields() ) {

// if it's unique, create a function to validate it and add it to the list of
// concurrent validations
if ( field.isUnique() ) {
// use write-first then read strategy
final UniqueValue written = new UniqueValueImpl( field, mvccEntity.getId(), mvccEntity.getVersion() );

// use TTL in case something goes wrong before entity is finally committed
final MutationBatch mb = uniqueValueStrat.write( scope, written, serializationFig.getTimeout() );

batch.mergeShallow( mb );
uniqueFields.add(field);
}
}

//short circuit nothing to do
if ( uniqueFields.size() == 0 ) {
return ;
}

//perform the write
try {
batch.execute();
}
catch ( ConnectionException ex ) {
throw new RuntimeException( "Unable to write to cassandra", ex );
}

// use simple thread pool to verify fields in parallel
ConsistentReplayCommand cmd = new ConsistentReplayCommand(uniqueValueStrat,keyspace,serializationFig,cassandraFig,scope,entity);
ConsistentReplayCommand cmd = new ConsistentReplayCommand(uniqueValueStrat,cassandraFig,scope, uniqueFields,entity);
Map<String,Field> uniquenessViolations = cmd.execute();
cmd.getFailedExecutionException();
//We have violations, throw an exception
if ( !uniquenessViolations.isEmpty() ) {
throw new WriteUniqueVerifyException( mvccEntity, ioevent.getEntityCollection(), uniquenessViolations );
Expand All @@ -113,19 +150,17 @@ public void call( final CollectionIoEvent<MvccEntity> ioevent ) {
private static class ConsistentReplayCommand extends HystrixCommand<Map<String,Field>>{

private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
private final Keyspace keySpace;
private final SerializationFig serializationFig;
private final CassandraConfig fig;
private final CollectionScope scope;
private final List<Field> uniqueFields;
private final Entity entity;

public ConsistentReplayCommand(UniqueValueSerializationStrategy uniqueValueSerializationStrategy,Keyspace keySpace, SerializationFig serializationFig, CassandraConfig fig,CollectionScope scope, Entity entity){
public ConsistentReplayCommand(UniqueValueSerializationStrategy uniqueValueSerializationStrategy, CassandraConfig fig, CollectionScope scope, List<Field> uniqueFields, Entity entity){
super(REPLAY_GROUP);
this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
this.keySpace = keySpace;
this.serializationFig = serializationFig;
this.fig = fig;
this.scope = scope;
this.uniqueFields = uniqueFields;
this.entity = entity;
}

Expand All @@ -142,23 +177,17 @@ protected Map<String, Field> getFallback() {
public Map<String, Field> executeStrategy(ConsistencyLevel consistencyLevel){
Collection<Field> entityFields = entity.getFields();
//allocate our max size, worst case
final List<Field> uniqueFields = new ArrayList<Field>( entityFields.size() );
//now get the set of fields back
final UniqueValueSet uniqueValues;
//todo add consistencylevel and read back if fail using

try {

uniqueValues = uniqueValueSerializationStrategy.load( scope,consistencyLevel, uniqueFields );
uniqueValues = uniqueValueSerializationStrategy.load( scope,consistencyLevel, entityFields );
}
catch ( ConnectionException e ) {
throw new RuntimeException( "Unable to read from cassandra", e );
}


final Map<String, Field> uniquenessViolations = new HashMap<>( uniqueFields.size() );


//loop through each field that was unique
for ( final Field field : uniqueFields ) {

Expand All @@ -170,51 +199,12 @@ public Map<String, Field> executeStrategy(ConsistencyLevel consistencyLevel){
field.getName() ) );
}


final Id returnedEntityId = uniqueValue.getEntityId();


if ( !entity.getId().equals(returnedEntityId) ) {
uniquenessViolations.put( field.getName(), field );
}
}
final MutationBatch batch = keySpace.prepareMutationBatch();
//
// Construct all the functions for verifying we're unique
//
for ( final Field field : entity.getFields() ) {

// if it's unique, create a function to validate it and add it to the list of
// concurrent validations
if ( field.isUnique() ) {


// use write-first then read strategy
final UniqueValue written = new UniqueValueImpl( field, entity.getId(), entity.getVersion() );

// use TTL in case something goes wrong before entity is finally committed
final MutationBatch mb = uniqueValueSerializationStrategy.write( scope, written, serializationFig.getTimeout() );

batch.mergeShallow( mb );


uniqueFields.add(field);
}
}

//short circuit nothing to do
if ( uniqueFields.size() == 0 ) {
return uniquenessViolations ;
}


//perform the write
try {
batch.execute();
}
catch ( ConnectionException ex ) {
throw new RuntimeException( "Unable to write to cassandra", ex );
}

return uniquenessViolations;
}
Expand Down

0 comments on commit 117c0f3

Please sign in to comment.