Skip to content
This repository has been archived by the owner on Jan 5, 2022. It is now read-only.

Commit

Permalink
Fixes shard allocation. Keeps old allocation logic for deletion.
Browse files Browse the repository at this point in the history
  • Loading branch information
Todd Nine committed Jul 9, 2015
1 parent a22c996 commit d1ca419
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 54 deletions.
Expand Up @@ -95,33 +95,34 @@ public static Mutator<ByteBuffer> addLocationEntryInsertionToMutator( Mutator<By


private static Mutator<ByteBuffer> batchAddConnectionIndexEntries( Mutator<ByteBuffer> m,
IndexBucketLocator locator, UUID appId,
IndexBucketLocator locator, UUID entityId,
String propertyName, String geoCell,
UUID[] index_keys, ByteBuffer columnName,
ByteBuffer columnValue, long timestamp ) {

final String bucket = locator.getBucket( entityId );


// entity_id,prop_name
Object property_index_key =
key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, propertyName, DICTIONARY_GEOCELL, geoCell,
locator.getBucket(index_keys[ConnectionRefImpl.ALL] ) );
bucket );

// entity_id,entity_type,prop_name
Object entity_type_prop_index_key =
key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, propertyName, DICTIONARY_GEOCELL,
geoCell,
locator.getBucket( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE] ) );
bucket );

// entity_id,connection_type,prop_name
Object connection_type_prop_index_key =
key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, propertyName,
DICTIONARY_GEOCELL, geoCell, locator.getBucket(
index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE] ) );
DICTIONARY_GEOCELL, geoCell, bucket );

// entity_id,connection_type,entity_type,prop_name
Object connection_type_and_entity_type_prop_index_key =
key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, propertyName,
DICTIONARY_GEOCELL, geoCell, locator.getBucket(
index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE] ) );
DICTIONARY_GEOCELL, geoCell, bucket );

// composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp)
addInsertToMutator( m, ENTITY_INDEX, property_index_key, columnName, columnValue, timestamp );
Expand Down Expand Up @@ -151,7 +152,7 @@ public static void batchStoreLocationInConnectionsIndex( Mutator<ByteBuffer> m,
ByteBuffer columnValue = location.getColumnValue().serialize();
long ts = location.getTimestampInMicros();
for ( String cell : cells ) {
batchAddConnectionIndexEntries( m, locator, appId, propertyName, cell, index_keys, columnName, columnValue,
batchAddConnectionIndexEntries( m, locator, location.getUuid(), propertyName, cell, index_keys, columnName, columnValue,
ts );
}

Expand All @@ -177,11 +178,15 @@ private static Mutator<ByteBuffer> addLocationEntryDeletionToMutator( Mutator<By


private static Mutator<ByteBuffer> batchDeleteConnectionIndexEntries( Mutator<ByteBuffer> m,
IndexBucketLocator locator, UUID appId,
IndexBucketLocator locator, UUID entityId,
String propertyName, String geoCell,
UUID[] index_keys, ByteBuffer columnName,
long timestamp ) {

/**
* Legacy key scheme
*/

// entity_id,prop_name
Object property_index_key =
key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, propertyName, DICTIONARY_GEOCELL, geoCell,
Expand Down Expand Up @@ -221,6 +226,49 @@ private static Mutator<ByteBuffer> batchDeleteConnectionIndexEntries( Mutator<By
m.addDeletion( bytebuffer( connection_type_and_entity_type_prop_index_key ), ENTITY_INDEX.toString(),
columnName, ByteBufferSerializer.get(), timestamp );


/**
* New key scheme
*/

final String bucket = locator.getBucket( entityId );

// entity_id,prop_name
Object property_index_key_new =
key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, propertyName, DICTIONARY_GEOCELL, geoCell,
bucket );

// entity_id,entity_type,prop_name
Object entity_type_prop_index_key_new =
key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, propertyName, DICTIONARY_GEOCELL,
geoCell, bucket );

// entity_id,connection_type,prop_name
Object connection_type_prop_index_key_new =
key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, propertyName,
DICTIONARY_GEOCELL, geoCell, bucket );

// entity_id,connection_type,entity_type,prop_name
Object connection_type_and_entity_type_prop_index_key_new =
key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, propertyName,
DICTIONARY_GEOCELL, geoCell, bucket );

// composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp)
m.addDeletion( bytebuffer( property_index_key_new ), ENTITY_INDEX.toString(), columnName,
ByteBufferSerializer.get(), timestamp );

// composite(property_value,connected_entity_id,connection_type,entry_timestamp)
m.addDeletion( bytebuffer( entity_type_prop_index_key_new ), ENTITY_INDEX.toString(), columnName,
ByteBufferSerializer.get(), timestamp );

// composite(property_value,connected_entity_id,entity_type,entry_timestamp)
m.addDeletion( bytebuffer( connection_type_prop_index_key_new ), ENTITY_INDEX.toString(), columnName,
ByteBufferSerializer.get(), timestamp );

// composite(property_value,connected_entity_id,entry_timestamp)
m.addDeletion( bytebuffer( connection_type_and_entity_type_prop_index_key_new ), ENTITY_INDEX.toString(),
columnName, ByteBufferSerializer.get(), timestamp );

return m;
}

Expand All @@ -238,7 +286,7 @@ public static void batchDeleteLocationInConnectionsIndex( Mutator<ByteBuffer> m,

for ( String cell : cells ) {

batchDeleteConnectionIndexEntries( m, locator, appId, propertyName, cell, index_keys, columnName, ts );
batchDeleteConnectionIndexEntries( m, locator, location.getUuid(), propertyName, cell, index_keys, columnName, ts );
}

logger.info( "Geocells to be saved for Point({} , {} ) are: {}", new Object[] {
Expand All @@ -247,8 +295,7 @@ public static void batchDeleteLocationInConnectionsIndex( Mutator<ByteBuffer> m,
}


public static void batchStoreLocationInCollectionIndex( Mutator<ByteBuffer> m, IndexBucketLocator locator,
UUID appId, Object key, UUID entityId,
public static void batchStoreLocationInCollectionIndex( Mutator<ByteBuffer> m, IndexBucketLocator locator, Object key, UUID entityId,
EntityLocationRef location ) {

Point p = location.getPoint();
Expand Down Expand Up @@ -276,15 +323,14 @@ public void storeLocationInCollectionIndex( EntityRef owner, String collectionNa
Keyspace ko = cass.getApplicationKeyspace( em.getApplicationId() );
Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( ko, ByteBufferSerializer.get() );

batchStoreLocationInCollectionIndex( m, em.getIndexBucketLocator(), em.getApplicationId(),
batchStoreLocationInCollectionIndex( m, em.getIndexBucketLocator(),
key( owner.getUuid(), collectionName, propertyName ), owner.getUuid(), location );

batchExecute( m, CassandraService.RETRY_COUNT );
}


public static void batchRemoveLocationFromCollectionIndex( Mutator<ByteBuffer> m, IndexBucketLocator locator,
UUID appId, Object key, EntityLocationRef location ) {
public static void batchRemoveLocationFromCollectionIndex( Mutator<ByteBuffer> m, IndexBucketLocator locator, Object key, EntityLocationRef location ) {

Point p = location.getPoint();
List<String> cells = GeocellManager.generateGeoCell( p );
Expand Down Expand Up @@ -314,7 +360,7 @@ public void removeLocationFromCollectionIndex( EntityRef owner, String collectio
Keyspace ko = cass.getApplicationKeyspace( em.getApplicationId() );
Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( ko, ByteBufferSerializer.get() );

batchRemoveLocationFromCollectionIndex( m, em.getIndexBucketLocator(), em.getApplicationId(),
batchRemoveLocationFromCollectionIndex( m, em.getIndexBucketLocator(),
key( owner.getUuid(), collectionName, propertyName ), location );

batchExecute( m, CassandraService.RETRY_COUNT );
Expand Down
Expand Up @@ -21,8 +21,10 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -70,7 +72,7 @@ private synchronized ExecutorService getExecutorService(){
}


executorService = new ThreadPoolExecutor( threadCount, threadCount, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>( ), new CallerRunsExecutionHandler() );
executorService = new ThreadPoolExecutor( threadCount, threadCount, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>( ), new QueryThreadFactory(), new CallerRunsExecutionHandler() );
return executorService;
}

Expand All @@ -92,4 +94,25 @@ public void rejectedExecution( final Runnable r, final ThreadPoolExecutor execut
}
}

/**
* Simple factory for labeling job worker threads for easier debugging
*/
private static final class QueryThreadFactory implements ThreadFactory {

public static final QueryThreadFactory INSTANCE = new QueryThreadFactory();

private static final String NAME = "query-";
private final AtomicLong counter = new AtomicLong();


@Override
public Thread newThread( final Runnable r ) {

Thread newThread = new Thread( r, NAME + counter.incrementAndGet() );
newThread.setDaemon( true );

return newThread;
}
}

}
Expand Up @@ -250,7 +250,7 @@ public IndexUpdate batchUpdateCollectionIndex( IndexUpdate indexUpdate, EntityRe
EntityLocationRef loc =
new EntityLocationRef( indexUpdate.getEntity(), indexEntry.getTimestampUuid(),
indexEntry.getValue().toString() );
batchStoreLocationInCollectionIndex( indexUpdate.getBatch(), indexBucketLocator, applicationId,
batchStoreLocationInCollectionIndex( indexUpdate.getBatch(), indexBucketLocator,
index_name, indexedEntity.getUuid(), loc );
}

Expand Down Expand Up @@ -555,27 +555,32 @@ public Mutator<ByteBuffer> batchDeleteConnectionIndexEntries( IndexUpdate indexU
ConnectionRefImpl connection, UUID[] index_keys )
throws Exception {


/**
* Original bucket scheme. Incorrect and legacy, but we need to keep it b/c we're not sure if the original write was the
* incorrect legacy system
*/

// entity_id,prop_name
Object property_index_key = key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, entry.getPath(),
indexBucketLocator.getBucket( index_keys[ConnectionRefImpl.ALL] ) );

// entity_id,entity_type,prop_name
Object entity_type_prop_index_key =
key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
indexBucketLocator.getBucket(
index_keys[ConnectionRefImpl.BY_ENTITY_TYPE] ) );
indexBucketLocator.getBucket( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE] ) );

// entity_id,connection_type,prop_name
Object connection_type_prop_index_key =
key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, entry.getPath(),
indexBucketLocator.getBucket(
index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE]) );
indexBucketLocator.getBucket( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE] ) );

// entity_id,connection_type,entity_type,prop_name
Object connection_type_and_entity_type_prop_index_key =
key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
indexBucketLocator.getBucket(
index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE]) );
indexBucketLocator.getBucket( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE] ) );



// composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp)
addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, property_index_key,
Expand All @@ -596,35 +601,85 @@ public Mutator<ByteBuffer> batchDeleteConnectionIndexEntries( IndexUpdate indexU
addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_and_entity_type_prop_index_key,
entry.getIndexComposite( connection.getConnectedEntityId() ), indexUpdate.getTimestamp() );


/**
* New bucket scheme for deletes
*/

final UUID entityId = connection.getConnectedEntityId();
final String bucket = indexBucketLocator.getBucket( entityId );


// entity_id,prop_name
Object property_index_key_new =
key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, entry.getPath(), bucket );

// entity_id,entity_type,prop_name
Object entity_type_prop_index_key_new =
key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(), bucket );

// entity_id,connection_type,prop_name
Object connection_type_prop_index_key_new =
key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, entry.getPath(), bucket );

// entity_id,connection_type,entity_type,prop_name
Object connection_type_and_entity_type_prop_index_key_new =
key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
bucket );


// composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp)
addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, property_index_key_new,
entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType(),
connection.getConnectedEntityType() ), indexUpdate.getTimestamp() );

// composite(property_value,connected_entity_id,connection_type,entry_timestamp)
addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, entity_type_prop_index_key_new,
entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType() ),
indexUpdate.getTimestamp() );

// composite(property_value,connected_entity_id,entity_type,entry_timestamp)
addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_prop_index_key_new,
entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectedEntityType() ),
indexUpdate.getTimestamp() );

// composite(property_value,connected_entity_id,entry_timestamp)
addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_and_entity_type_prop_index_key_new,
entry.getIndexComposite( connection.getConnectedEntityId() ), indexUpdate.getTimestamp() );


return indexUpdate.getBatch();
}




@Metered(group = "core", name = "RelationManager_batchAddConnectionIndexEntries")
public Mutator<ByteBuffer> batchAddConnectionIndexEntries( IndexUpdate indexUpdate, IndexEntry entry,
ConnectionRefImpl connection, UUID[] index_keys ) {

final UUID entityId = connection.getConnectedEntityId();

final String bucket = indexBucketLocator.getBucket( entityId );

// entity_id,prop_name
Object property_index_key = key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, entry.getPath(),
indexBucketLocator.getBucket( index_keys[ConnectionRefImpl.ALL] ) );
bucket );

// entity_id,entity_type,prop_name
Object entity_type_prop_index_key =
key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
indexBucketLocator.getBucket(
index_keys[ConnectionRefImpl.BY_ENTITY_TYPE]) );
bucket );

// entity_id,connection_type,prop_name
Object connection_type_prop_index_key =
key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, entry.getPath(),
indexBucketLocator.getBucket(
index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE] ) );
bucket );

// entity_id,connection_type,entity_type,prop_name
Object connection_type_and_entity_type_prop_index_key =
key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
indexBucketLocator.getBucket(
index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE]) );
bucket );

// composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp)
addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, property_index_key,
Expand Down Expand Up @@ -725,7 +780,8 @@ public IndexUpdate batchUpdateConnectionIndex( IndexUpdate indexUpdate, Connecti
public Set<String> getConnectionIndexes( ConnectionRefImpl connection ) throws Exception {
List<HColumn<String, String>> results =
cass.getAllColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_DICTIONARIES,
key( connection.getConnectingIndexId(), Schema.DICTIONARY_INDEXES ), Serializers.se, Serializers.se );
key( connection.getConnectingIndexId(), Schema.DICTIONARY_INDEXES ), Serializers.se,
Serializers.se );
Set<String> indexes = new TreeSet<String>();
if ( results != null ) {
for ( HColumn<String, String> column : results ) {
Expand Down

0 comments on commit d1ca419

Please sign in to comment.