From d1ca419f3def8e92cefec8c84508185823d3d4ac Mon Sep 17 00:00:00 2001 From: Todd Nine Date: Thu, 9 Jul 2015 13:44:50 -0600 Subject: [PATCH] Fixes shard allocation. Keeps old allocation logic for deletion. --- .../cassandra/GeoIndexManager.java | 78 +++++++++++++---- .../cassandra/QueryExecutorServiceImpl.java | 25 +++++- .../cassandra/RelationManagerImpl.java | 86 +++++++++++++++---- .../ir/result/ConnectionShardFilter.java | 26 ++---- .../ir/result/SearchConnectionVisitor.java | 4 +- 5 files changed, 165 insertions(+), 54 deletions(-) diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/GeoIndexManager.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/GeoIndexManager.java index ce5fab839c..109881f40b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/GeoIndexManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/GeoIndexManager.java @@ -95,33 +95,34 @@ public static Mutator addLocationEntryInsertionToMutator( Mutator batchAddConnectionIndexEntries( Mutator m, - IndexBucketLocator locator, UUID appId, + IndexBucketLocator locator, UUID entityId, String propertyName, String geoCell, UUID[] index_keys, ByteBuffer columnName, ByteBuffer columnValue, long timestamp ) { + final String bucket = locator.getBucket( entityId ); + + // entity_id,prop_name Object property_index_key = key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, propertyName, DICTIONARY_GEOCELL, geoCell, - locator.getBucket(index_keys[ConnectionRefImpl.ALL] ) ); + bucket ); // entity_id,entity_type,prop_name Object entity_type_prop_index_key = key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, propertyName, DICTIONARY_GEOCELL, geoCell, - locator.getBucket( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE] ) ); + bucket ); // entity_id,connection_type,prop_name Object connection_type_prop_index_key = key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, propertyName, - DICTIONARY_GEOCELL, geoCell, locator.getBucket( - index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE] ) ); + DICTIONARY_GEOCELL, geoCell, bucket ); // entity_id,connection_type,entity_type,prop_name Object connection_type_and_entity_type_prop_index_key = key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, propertyName, - DICTIONARY_GEOCELL, geoCell, locator.getBucket( - index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE] ) ); + DICTIONARY_GEOCELL, geoCell, bucket ); // composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp) addInsertToMutator( m, ENTITY_INDEX, property_index_key, columnName, columnValue, timestamp ); @@ -151,7 +152,7 @@ public static void batchStoreLocationInConnectionsIndex( Mutator m, ByteBuffer columnValue = location.getColumnValue().serialize(); long ts = location.getTimestampInMicros(); for ( String cell : cells ) { - batchAddConnectionIndexEntries( m, locator, appId, propertyName, cell, index_keys, columnName, columnValue, + batchAddConnectionIndexEntries( m, locator, location.getUuid(), propertyName, cell, index_keys, columnName, columnValue, ts ); } @@ -177,11 +178,15 @@ private static Mutator addLocationEntryDeletionToMutator( Mutator batchDeleteConnectionIndexEntries( Mutator m, - IndexBucketLocator locator, UUID appId, + IndexBucketLocator locator, UUID entityId, String propertyName, String geoCell, UUID[] index_keys, ByteBuffer columnName, long timestamp ) { + /** + * Legacy key scheme + */ + // entity_id,prop_name Object property_index_key = key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, propertyName, DICTIONARY_GEOCELL, geoCell, @@ -221,6 +226,49 @@ private static Mutator batchDeleteConnectionIndexEntries( Mutator m, for ( String cell : cells ) { - batchDeleteConnectionIndexEntries( m, locator, appId, propertyName, cell, index_keys, columnName, ts ); + batchDeleteConnectionIndexEntries( m, locator, location.getUuid(), propertyName, cell, index_keys, columnName, ts ); } logger.info( "Geocells to be saved for Point({} , {} ) are: {}", new Object[] { @@ -247,8 +295,7 @@ public static void batchDeleteLocationInConnectionsIndex( Mutator m, } - public static void batchStoreLocationInCollectionIndex( Mutator m, IndexBucketLocator locator, - UUID appId, Object key, UUID entityId, + public static void batchStoreLocationInCollectionIndex( Mutator m, IndexBucketLocator locator, Object key, UUID entityId, EntityLocationRef location ) { Point p = location.getPoint(); @@ -276,15 +323,14 @@ public void storeLocationInCollectionIndex( EntityRef owner, String collectionNa Keyspace ko = cass.getApplicationKeyspace( em.getApplicationId() ); Mutator m = CountingMutator.createFlushingMutator( ko, ByteBufferSerializer.get() ); - batchStoreLocationInCollectionIndex( m, em.getIndexBucketLocator(), em.getApplicationId(), + batchStoreLocationInCollectionIndex( m, em.getIndexBucketLocator(), key( owner.getUuid(), collectionName, propertyName ), owner.getUuid(), location ); batchExecute( m, CassandraService.RETRY_COUNT ); } - public static void batchRemoveLocationFromCollectionIndex( Mutator m, IndexBucketLocator locator, - UUID appId, Object key, EntityLocationRef location ) { + public static void batchRemoveLocationFromCollectionIndex( Mutator m, IndexBucketLocator locator, Object key, EntityLocationRef location ) { Point p = location.getPoint(); List cells = GeocellManager.generateGeoCell( p ); @@ -314,7 +360,7 @@ public void removeLocationFromCollectionIndex( EntityRef owner, String collectio Keyspace ko = cass.getApplicationKeyspace( em.getApplicationId() ); Mutator m = CountingMutator.createFlushingMutator( ko, ByteBufferSerializer.get() ); - batchRemoveLocationFromCollectionIndex( m, em.getIndexBucketLocator(), em.getApplicationId(), + batchRemoveLocationFromCollectionIndex( m, em.getIndexBucketLocator(), key( owner.getUuid(), collectionName, propertyName ), location ); batchExecute( m, CassandraService.RETRY_COUNT ); diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryExecutorServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryExecutorServiceImpl.java index 7376641498..3504a3fe88 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryExecutorServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryExecutorServiceImpl.java @@ -21,8 +21,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,7 +72,7 @@ private synchronized ExecutorService getExecutorService(){ } - executorService = new ThreadPoolExecutor( threadCount, threadCount, 30, TimeUnit.SECONDS, new SynchronousQueue( ), new CallerRunsExecutionHandler() ); + executorService = new ThreadPoolExecutor( threadCount, threadCount, 30, TimeUnit.SECONDS, new SynchronousQueue( ), new QueryThreadFactory(), new CallerRunsExecutionHandler() ); return executorService; } @@ -92,4 +94,25 @@ public void rejectedExecution( final Runnable r, final ThreadPoolExecutor execut } } + /** + * Simple factory for labeling job worker threads for easier debugging + */ + private static final class QueryThreadFactory implements ThreadFactory { + + public static final QueryThreadFactory INSTANCE = new QueryThreadFactory(); + + private static final String NAME = "query-"; + private final AtomicLong counter = new AtomicLong(); + + + @Override + public Thread newThread( final Runnable r ) { + + Thread newThread = new Thread( r, NAME + counter.incrementAndGet() ); + newThread.setDaemon( true ); + + return newThread; + } + } + } diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java index 86e1690a2f..587f17dcb6 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java @@ -250,7 +250,7 @@ public IndexUpdate batchUpdateCollectionIndex( IndexUpdate indexUpdate, EntityRe EntityLocationRef loc = new EntityLocationRef( indexUpdate.getEntity(), indexEntry.getTimestampUuid(), indexEntry.getValue().toString() ); - batchStoreLocationInCollectionIndex( indexUpdate.getBatch(), indexBucketLocator, applicationId, + batchStoreLocationInCollectionIndex( indexUpdate.getBatch(), indexBucketLocator, index_name, indexedEntity.getUuid(), loc ); } @@ -555,6 +555,12 @@ public Mutator batchDeleteConnectionIndexEntries( IndexUpdate indexU ConnectionRefImpl connection, UUID[] index_keys ) throws Exception { + + /** + * Original bucket scheme. Incorrect and legacy, but we need to keep it b/c we're not sure if the original write was the + * incorrect legacy system + */ + // entity_id,prop_name Object property_index_key = key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, entry.getPath(), indexBucketLocator.getBucket( index_keys[ConnectionRefImpl.ALL] ) ); @@ -562,20 +568,19 @@ public Mutator batchDeleteConnectionIndexEntries( IndexUpdate indexU // entity_id,entity_type,prop_name Object entity_type_prop_index_key = key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(), - indexBucketLocator.getBucket( - index_keys[ConnectionRefImpl.BY_ENTITY_TYPE] ) ); + indexBucketLocator.getBucket( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE] ) ); // entity_id,connection_type,prop_name Object connection_type_prop_index_key = key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, entry.getPath(), - indexBucketLocator.getBucket( - index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE]) ); + indexBucketLocator.getBucket( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE] ) ); // entity_id,connection_type,entity_type,prop_name Object connection_type_and_entity_type_prop_index_key = key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(), - indexBucketLocator.getBucket( - index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE]) ); + indexBucketLocator.getBucket( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE] ) ); + + // composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp) addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, property_index_key, @@ -596,35 +601,85 @@ public Mutator batchDeleteConnectionIndexEntries( IndexUpdate indexU addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_and_entity_type_prop_index_key, entry.getIndexComposite( connection.getConnectedEntityId() ), indexUpdate.getTimestamp() ); + + /** + * New bucket scheme for deletes + */ + + final UUID entityId = connection.getConnectedEntityId(); + final String bucket = indexBucketLocator.getBucket( entityId ); + + + // entity_id,prop_name + Object property_index_key_new = + key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, entry.getPath(), bucket ); + + // entity_id,entity_type,prop_name + Object entity_type_prop_index_key_new = + key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(), bucket ); + + // entity_id,connection_type,prop_name + Object connection_type_prop_index_key_new = + key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, entry.getPath(), bucket ); + + // entity_id,connection_type,entity_type,prop_name + Object connection_type_and_entity_type_prop_index_key_new = + key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(), + bucket ); + + + // composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp) + addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, property_index_key_new, + entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType(), + connection.getConnectedEntityType() ), indexUpdate.getTimestamp() ); + + // composite(property_value,connected_entity_id,connection_type,entry_timestamp) + addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, entity_type_prop_index_key_new, + entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType() ), + indexUpdate.getTimestamp() ); + + // composite(property_value,connected_entity_id,entity_type,entry_timestamp) + addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_prop_index_key_new, + entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectedEntityType() ), + indexUpdate.getTimestamp() ); + + // composite(property_value,connected_entity_id,entry_timestamp) + addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_and_entity_type_prop_index_key_new, + entry.getIndexComposite( connection.getConnectedEntityId() ), indexUpdate.getTimestamp() ); + + return indexUpdate.getBatch(); } + + @Metered(group = "core", name = "RelationManager_batchAddConnectionIndexEntries") public Mutator batchAddConnectionIndexEntries( IndexUpdate indexUpdate, IndexEntry entry, ConnectionRefImpl connection, UUID[] index_keys ) { + final UUID entityId = connection.getConnectedEntityId(); + + final String bucket = indexBucketLocator.getBucket( entityId ); + // entity_id,prop_name Object property_index_key = key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, entry.getPath(), - indexBucketLocator.getBucket( index_keys[ConnectionRefImpl.ALL] ) ); + bucket ); // entity_id,entity_type,prop_name Object entity_type_prop_index_key = key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(), - indexBucketLocator.getBucket( - index_keys[ConnectionRefImpl.BY_ENTITY_TYPE]) ); + bucket ); // entity_id,connection_type,prop_name Object connection_type_prop_index_key = key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, entry.getPath(), - indexBucketLocator.getBucket( - index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE] ) ); + bucket ); // entity_id,connection_type,entity_type,prop_name Object connection_type_and_entity_type_prop_index_key = key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(), - indexBucketLocator.getBucket( - index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE]) ); + bucket ); // composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp) addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, property_index_key, @@ -725,7 +780,8 @@ public IndexUpdate batchUpdateConnectionIndex( IndexUpdate indexUpdate, Connecti public Set getConnectionIndexes( ConnectionRefImpl connection ) throws Exception { List> results = cass.getAllColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_DICTIONARIES, - key( connection.getConnectingIndexId(), Schema.DICTIONARY_INDEXES ), Serializers.se, Serializers.se ); + key( connection.getConnectingIndexId(), Schema.DICTIONARY_INDEXES ), Serializers.se, + Serializers.se ); Set indexes = new TreeSet(); if ( results != null ) { for ( HColumn column : results ) { diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionShardFilter.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionShardFilter.java index f56dc2b702..2fec25092f 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionShardFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/ConnectionShardFilter.java @@ -21,7 +21,6 @@ import java.util.UUID; import org.apache.usergrid.persistence.IndexBucketLocator; -import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl; /** @@ -30,35 +29,22 @@ public final class ConnectionShardFilter implements ShardFilter { private final IndexBucketLocator indexBucketLocator; private final String expectedBucket; - private final ConnectionRefImpl searchConnection; - public ConnectionShardFilter( final IndexBucketLocator indexBucketLocator, final String expectedBucket, - final ConnectionRefImpl connection ) { + public ConnectionShardFilter( final IndexBucketLocator indexBucketLocator, final String expectedBucket ) { this.indexBucketLocator = indexBucketLocator; this.expectedBucket = expectedBucket; - this.searchConnection = connection; - - } - - - public boolean isInShard( final ScanColumn scanColumn ) { - //shard hashing is currently based on source. this is a placeholder for when this is fixed. -// UUID[] indexIds = searchConnection.getIndexIds(); -// -// final String shard = indexBucketLocator.getBucket(indexIds[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE] ); -// -// return expectedBucket.equals( shard ); - - return true; -// - } + final UUID entityId = scanColumn.getUUID(); + //not for our current processing shard, discard + final String shard = indexBucketLocator.getBucket( entityId ); + return expectedBucket.equals( shard ); + } } diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java index f518297712..909ae5de44 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/query/ir/result/SearchConnectionVisitor.java @@ -136,7 +136,7 @@ public void visit( WithinNode node ) throws Exception { final ConnectionShardFilter - validator = new ConnectionShardFilter(indexBucketLocator, bucket, connection ); + validator = new ConnectionShardFilter(indexBucketLocator, bucket ); this.results.push( new ShardFilterIterator( validator, itr, size ) ); @@ -213,7 +213,7 @@ public void visit( AllNode node ) throws Exception { //we have to create our wrapper so validate the data we read is correct for our shard - final ConnectionShardFilter connectionShardFilter = new ConnectionShardFilter( indexBucketLocator, bucket, connection); + final ConnectionShardFilter connectionShardFilter = new ConnectionShardFilter( indexBucketLocator, bucket); final SliceIterator sliceIterator = new SliceIterator( connectionScanner, connectionParser );