From dc8ee08ef40ab42751bdcee8bc76daeabf68894b Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 29 Mar 2018 10:30:12 -0700 Subject: [PATCH] Coordinator balancer move then drop fix (#5528) * #5521 part 1 * formatting * oops * less magic tests --- .../AbstractCuratorServerInventoryView.java | 63 +-- .../coordinator/CuratorLoadQueuePeon.java | 70 +-- .../server/coordinator/DruidCoordinator.java | 128 ++--- .../helper/DruidCoordinatorRuleRunner.java | 6 +- .../io/druid/curator/CuratorTestBase.java | 51 ++ .../CuratorDruidCoordinatorTest.java | 534 ++++++++++++++++++ 6 files changed, 674 insertions(+), 178 deletions(-) create mode 100644 server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java diff --git a/server/src/main/java/io/druid/client/AbstractCuratorServerInventoryView.java b/server/src/main/java/io/druid/client/AbstractCuratorServerInventoryView.java index d6c731cdc58f..3c7159c0c2a8 100644 --- a/server/src/main/java/io/druid/client/AbstractCuratorServerInventoryView.java +++ b/server/src/main/java/io/druid/client/AbstractCuratorServerInventoryView.java @@ -23,17 +23,16 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Throwables; -import io.druid.java.util.emitter.EmittingLogger; -import io.druid.java.util.common.concurrent.Execs; import io.druid.curator.inventory.CuratorInventoryManager; import io.druid.curator.inventory.CuratorInventoryManagerStrategy; import io.druid.curator.inventory.InventoryManagerConfig; import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; +import io.druid.java.util.emitter.EmittingLogger; import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.utils.ZKPaths; import java.io.IOException; import java.util.Collection; @@ -157,14 +156,7 @@ public void inventoryInitialized() { log.info("Inventory Initialized"); runSegmentCallbacks( - new Function() - { - @Override - public CallbackAction apply(SegmentCallback input) - { - return input.segmentViewInitialized(); - } - } + input -> input.segmentViewInitialized() ); } } @@ -233,15 +225,10 @@ protected void runSegmentCallbacks( { for (final Map.Entry entry : segmentCallbacks.entrySet()) { entry.getValue().execute( - new Runnable() - { - @Override - public void run() - { - if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) { - segmentCallbackRemoved(entry.getKey()); - segmentCallbacks.remove(entry.getKey()); - } + () -> { + if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) { + segmentCallbackRemoved(entry.getKey()); + segmentCallbacks.remove(entry.getKey()); } } ); @@ -252,14 +239,9 @@ private void runServerRemovedCallbacks(final DruidServer server) { for (final Map.Entry entry : serverRemovedCallbacks.entrySet()) { entry.getValue().execute( - new Runnable() - { - @Override - public void run() - { - if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) { - serverRemovedCallbacks.remove(entry.getKey()); - } + () -> { + if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) { + serverRemovedCallbacks.remove(entry.getKey()); } } ); @@ -286,14 +268,7 @@ protected void addSingleInventory( container.addDataSegment(inventory); runSegmentCallbacks( - new Function() - { - @Override - public CallbackAction apply(SegmentCallback input) - { - return input.segmentAdded(container.getMetadata(), inventory); - } - } + input -> input.segmentAdded(container.getMetadata(), inventory) ); } @@ -315,14 +290,7 @@ protected void removeSingleInventory(final DruidServer container, String invento container.removeDataSegment(inventoryKey); runSegmentCallbacks( - new Function() - { - @Override - public CallbackAction apply(SegmentCallback input) - { - return input.segmentRemoved(container.getMetadata(), segment); - } - } + input -> input.segmentRemoved(container.getMetadata(), segment) ); } @@ -330,11 +298,8 @@ public CallbackAction apply(SegmentCallback input) public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment) { try { - String toServedSegPath = ZKPaths.makePath( - ZKPaths.makePath(getInventoryManagerConfig().getInventoryPath(), serverKey), - segment.getIdentifier() - ); - return curator.checkExists().forPath(toServedSegPath) != null; + DruidServer server = getInventoryValue(serverKey); + return server != null && server.getSegment(segment.getIdentifier()) != null; } catch (Exception ex) { throw Throwables.propagate(ex); diff --git a/server/src/main/java/io/druid/server/coordinator/CuratorLoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/CuratorLoadQueuePeon.java index 597495493b64..9db66d462290 100644 --- a/server/src/main/java/io/druid/server/coordinator/CuratorLoadQueuePeon.java +++ b/server/src/main/java/io/druid/server/coordinator/CuratorLoadQueuePeon.java @@ -34,14 +34,12 @@ import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.data.Stat; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; @@ -249,7 +247,7 @@ private void processSegmentChangeRequest() if (currentlyProcessing == null) { if (!stopped) { log.makeAlert("Crazy race condition! server[%s]", basePath) - .emit(); + .emit(); } actionCompleted(); return; @@ -261,38 +259,28 @@ private void processSegmentChangeRequest() curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload); processingExecutor.schedule( - new Runnable() - { - @Override - public void run() - { - try { - if (curator.checkExists().forPath(path) != null) { - failAssign(new ISE("%s was never removed! Failing this operation!", path)); - } - } - catch (Exception e) { - failAssign(e); + () -> { + try { + if (curator.checkExists().forPath(path) != null) { + failAssign(new ISE("%s was never removed! Failing this operation!", path)); } } + catch (Exception e) { + failAssign(e); + } }, config.getLoadTimeoutDelay().getMillis(), TimeUnit.MILLISECONDS ); final Stat stat = curator.checkExists().usingWatcher( - new CuratorWatcher() - { - @Override - public void process(WatchedEvent watchedEvent) throws Exception - { - switch (watchedEvent.getType()) { - case NodeDeleted: - entryRemoved(watchedEvent.getPath()); - break; - default: - // do nothing - } + (CuratorWatcher) watchedEvent -> { + switch (watchedEvent.getType()) { + case NodeDeleted: + entryRemoved(watchedEvent.getPath()); + break; + default: + // do nothing } } ).forPath(path); @@ -341,14 +329,7 @@ private void actionCompleted() final List callbacks = currentlyProcessing.getCallbacks(); currentlyProcessing = null; callBackExecutor.execute( - new Runnable() - { - @Override - public void run() - { - executeCallbacks(callbacks); - } - } + () -> executeCallbacks(callbacks) ); } } @@ -360,18 +341,13 @@ public void start() processingExecutor, config.getLoadQueuePeonRepeatDelay(), config.getLoadQueuePeonRepeatDelay(), - new Callable() - { - @Override - public ScheduledExecutors.Signal call() - { - processSegmentChangeRequest(); - - if (stopped) { - return ScheduledExecutors.Signal.STOP; - } else { - return ScheduledExecutors.Signal.REPEAT; - } + () -> { + processSegmentChangeRequest(); + + if (stopped) { + return ScheduledExecutors.Signal.STOP; + } else { + return ScheduledExecutors.Signal.REPEAT; } } ); diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 1be52f43cac9..e12ac867584d 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -20,7 +20,6 @@ package io.druid.server.coordinator; import com.google.common.base.Function; -import com.google.common.base.Predicate; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -30,8 +29,6 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; -import io.druid.java.util.emitter.EmittingLogger; -import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.client.DruidDataSource; import io.druid.client.DruidServer; import io.druid.client.ImmutableDruidDataSource; @@ -56,6 +53,8 @@ import io.druid.java.util.common.guava.FunctionalIterable; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.metadata.MetadataRuleManager; import io.druid.metadata.MetadataSegmentManager; import io.druid.server.DruidNode; @@ -97,14 +96,8 @@ public class DruidCoordinator { public static Comparator SEGMENT_COMPARATOR = Ordering.from(Comparators.intervalsByEndThenStart()) .onResultOf( - new Function() - { - @Override - public Interval apply(DataSegment segment) - { - return segment.getInterval(); - } - }) + (Function) segment -> segment + .getInterval()) .compound(Ordering.natural()) .reverse(); @@ -553,7 +546,8 @@ public ScheduledExecutors.Signal call() if (coordLeaderSelector.isLeader() && startingLeaderCounter == coordLeaderSelector.localTerm()) { theRunnable.run(); } - if (coordLeaderSelector.isLeader() && startingLeaderCounter == coordLeaderSelector.localTerm()) { // (We might no longer be leader) + if (coordLeaderSelector.isLeader() + && startingLeaderCounter == coordLeaderSelector.localTerm()) { // (We might no longer be leader) return ScheduledExecutors.Signal.REPEAT; } else { return ScheduledExecutors.Signal.STOP; @@ -673,82 +667,58 @@ public CoordinatorHistoricalManagerRunnable(final int startingLeaderCounter) super( ImmutableList.of( new DruidCoordinatorSegmentInfoLoader(DruidCoordinator.this), - new DruidCoordinatorHelper() - { - @Override - public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) - { - // Display info about all historical servers - Iterable servers = FunctionalIterable - .create(serverInventoryView.getInventory()) - .filter( - new Predicate() - { - @Override - public boolean apply( - DruidServer input - ) - { - return input.segmentReplicatable(); - } - } - ).transform( - new Function() - { - @Override - public ImmutableDruidServer apply(DruidServer input) - { - return input.toImmutableDruidServer(); - } - } - ); - - if (log.isDebugEnabled()) { - log.debug("Servers"); - for (ImmutableDruidServer druidServer : servers) { - log.debug(" %s", druidServer); - log.debug(" -- DataSources"); - for (ImmutableDruidDataSource druidDataSource : druidServer.getDataSources()) { - log.debug(" %s", druidDataSource); - } + params -> { + // Display info about all historical servers + Iterable servers = FunctionalIterable + .create(serverInventoryView.getInventory()) + .filter(DruidServer::segmentReplicatable) + .transform(DruidServer::toImmutableDruidServer); + + if (log.isDebugEnabled()) { + log.debug("Servers"); + for (ImmutableDruidServer druidServer : servers) { + log.debug(" %s", druidServer); + log.debug(" -- DataSources"); + for (ImmutableDruidDataSource druidDataSource : druidServer.getDataSources()) { + log.debug(" %s", druidDataSource); } } + } - // Find all historical servers, group them by subType and sort by ascending usage - final DruidCluster cluster = new DruidCluster(); - for (ImmutableDruidServer server : servers) { - if (!loadManagementPeons.containsKey(server.getName())) { - LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(server); - loadQueuePeon.start(); - log.info("Created LoadQueuePeon for server[%s].", server.getName()); + // Find all historical servers, group them by subType and sort by ascending usage + final DruidCluster cluster = new DruidCluster(); + for (ImmutableDruidServer server : servers) { + if (!loadManagementPeons.containsKey(server.getName())) { + LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(server); + loadQueuePeon.start(); + log.info("Created LoadQueuePeon for server[%s].", server.getName()); - loadManagementPeons.put(server.getName(), loadQueuePeon); - } - - cluster.add(new ServerHolder(server, loadManagementPeons.get(server.getName()))); + loadManagementPeons.put(server.getName(), loadQueuePeon); } - segmentReplicantLookup = SegmentReplicantLookup.make(cluster); + cluster.add(new ServerHolder(server, loadManagementPeons.get(server.getName()))); + } - // Stop peons for servers that aren't there anymore. - final Set disappeared = Sets.newHashSet(loadManagementPeons.keySet()); - for (ImmutableDruidServer server : servers) { - disappeared.remove(server.getName()); - } - for (String name : disappeared) { - log.info("Removing listener for server[%s] which is no longer there.", name); - LoadQueuePeon peon = loadManagementPeons.remove(name); - peon.stop(); - } + segmentReplicantLookup = SegmentReplicantLookup.make(cluster); - return params.buildFromExisting() - .withDruidCluster(cluster) - .withDatabaseRuleManager(metadataRuleManager) - .withLoadManagementPeons(loadManagementPeons) - .withSegmentReplicantLookup(segmentReplicantLookup) - .withBalancerReferenceTimestamp(DateTimes.nowUtc()) - .build(); + // Stop peons for servers that aren't there anymore. + final Set disappeared = Sets.newHashSet(loadManagementPeons.keySet()); + for (ImmutableDruidServer server : servers) { + disappeared.remove(server.getName()); + } + for (String name : disappeared) { + log.info("Removing listener for server[%s] which is no longer there.", name); + LoadQueuePeon peon = loadManagementPeons.remove(name); + peon.stop(); } + + return params.buildFromExisting() + .withDruidCluster(cluster) + .withDatabaseRuleManager(metadataRuleManager) + .withLoadManagementPeons(loadManagementPeons) + .withSegmentReplicantLookup(segmentReplicantLookup) + .withBalancerReferenceTimestamp(DateTimes.nowUtc()) + .build(); }, new DruidCoordinatorRuleRunner(DruidCoordinator.this), new DruidCoordinatorCleanupUnneeded(DruidCoordinator.this), diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java index 5fa0ff17b220..47191606c3b1 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java @@ -20,9 +20,9 @@ package io.druid.server.coordinator.helper; import com.google.common.collect.Lists; -import io.druid.java.util.common.guava.Comparators; -import io.druid.java.util.emitter.EmittingLogger; +import com.google.common.collect.Ordering; import io.druid.java.util.common.DateTimes; +import io.druid.java.util.emitter.EmittingLogger; import io.druid.metadata.MetadataRuleManager; import io.druid.server.coordinator.CoordinatorStats; import io.druid.server.coordinator.DruidCluster; @@ -92,7 +92,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) for (DataSegment segment : params.getAvailableSegments()) { VersionedIntervalTimeline timeline = timelines.get(segment.getDataSource()); if (timeline == null) { - timeline = new VersionedIntervalTimeline<>(Comparators.comparable()); + timeline = new VersionedIntervalTimeline<>(Ordering.natural()); timelines.put(segment.getDataSource(), timeline); } diff --git a/server/src/test/java/io/druid/curator/CuratorTestBase.java b/server/src/test/java/io/druid/curator/CuratorTestBase.java index 6ad32306172b..502d4c83a2e4 100644 --- a/server/src/test/java/io/druid/curator/CuratorTestBase.java +++ b/server/src/test/java/io/druid/curator/CuratorTestBase.java @@ -23,6 +23,8 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableSet; import io.druid.client.DruidServer; +import io.druid.common.utils.UUIDUtils; +import io.druid.java.util.common.DateTimes; import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; @@ -44,6 +46,8 @@ public class CuratorTestBase protected Timing timing; protected CuratorFramework curator; + private int batchCtr = 0; + protected void setupServerAndCurator() throws Exception { server = new TestingServer(); @@ -127,6 +131,47 @@ protected void announceSegmentForServer( } } + protected String announceBatchSegmentsForServer( + DruidServer druidServer, + ImmutableSet segments, + ZkPathsConfig zkPathsConfig, + ObjectMapper jsonMapper + ) + { + final String segmentAnnouncementPath = ZKPaths.makePath(ZKPaths.makePath( + zkPathsConfig.getLiveSegmentsPath(), + druidServer.getHost()), + UUIDUtils.generateUuid( + druidServer.getHost(), + druidServer.getType().toString(), + druidServer.getTier(), + DateTimes.nowUtc().toString() + ) + String.valueOf(batchCtr++) + ); + + + try { + curator.create() + .compressed() + .withMode(CreateMode.EPHEMERAL) + .forPath(segmentAnnouncementPath, jsonMapper.writeValueAsBytes(segments)); + } + catch (KeeperException.NodeExistsException e) { + try { + curator.setData() + .forPath(segmentAnnouncementPath, jsonMapper.writeValueAsBytes(segments)); + } + catch (Exception e1) { + Throwables.propagate(e1); + } + } + catch (Exception e) { + Throwables.propagate(e); + } + + return segmentAnnouncementPath; + } + protected void unannounceSegmentForServer(DruidServer druidServer, DataSegment segment, ZkPathsConfig zkPathsConfig) throws Exception { @@ -138,6 +183,12 @@ protected void unannounceSegmentForServer(DruidServer druidServer, DataSegment s ); } + protected void unannounceSegmentFromBatchForServer(DruidServer druidServer, DataSegment segment, String batchPath, ZkPathsConfig zkPathsConfig) + throws Exception + { + curator.delete().guaranteed().forPath(batchPath); + } + protected void tearDownServerAndCurator() { try { diff --git a/server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java new file mode 100644 index 000000000000..19e9c3a1bcf4 --- /dev/null +++ b/server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java @@ -0,0 +1,534 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import io.druid.client.BatchServerInventoryView; +import io.druid.client.CoordinatorServerView; +import io.druid.client.DruidServer; +import io.druid.client.ImmutableDruidDataSource; +import io.druid.common.config.JacksonConfigManager; +import io.druid.curator.CuratorTestBase; +import io.druid.curator.discovery.NoopServiceAnnouncer; +import io.druid.discovery.DruidLeaderSelector; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.Pair; +import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.common.concurrent.ScheduledExecutorFactory; +import io.druid.metadata.MetadataRuleManager; +import io.druid.metadata.MetadataSegmentManager; +import io.druid.segment.TestHelper; +import io.druid.server.DruidNode; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.coordination.ServerType; +import io.druid.server.initialization.ZkPathsConfig; +import io.druid.server.lookup.cache.LookupCoordinatorManager; +import io.druid.server.metrics.NoopServiceEmitter; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.utils.ZKPaths; +import org.easymock.EasyMock; +import org.joda.time.Duration; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +/** + * This tests zookeeper specific coordinator/load queue/historical interactions, such as moving segments by the balancer + */ +public class CuratorDruidCoordinatorTest extends CuratorTestBase +{ + private DruidCoordinator coordinator; + private MetadataSegmentManager databaseSegmentManager; + private ScheduledExecutorFactory scheduledExecutorFactory; + private ConcurrentMap loadManagementPeons; + private LoadQueuePeon sourceLoadQueuePeon; + private LoadQueuePeon destinationLoadQueuePeon; + private MetadataRuleManager metadataRuleManager; + private CountDownLatch leaderAnnouncerLatch; + private CountDownLatch leaderUnannouncerLatch; + private PathChildrenCache sourceLoadQueueChildrenCache; + private PathChildrenCache destinationLoadQueueChildrenCache; + private DruidCoordinatorConfig druidCoordinatorConfig; + private ObjectMapper objectMapper; + private JacksonConfigManager configManager; + private DruidNode druidNode; + private static final String SEGPATH = "/druid/segments"; + private static final String SOURCE_LOAD_PATH = "/druid/loadQueue/localhost:1"; + private static final String DESTINATION_LOAD_PATH = "/druid/loadQueue/localhost:2"; + private static final long COORDINATOR_START_DELAY = 1; + private static final long COORDINATOR_PERIOD = 100; + + private BatchServerInventoryView baseView; + private CoordinatorServerView serverView; + private CountDownLatch segmentViewInitLatch; + private CountDownLatch segmentAddedLatch; + private CountDownLatch segmentRemovedLatch; + private final ObjectMapper jsonMapper; + private final ZkPathsConfig zkPathsConfig; + + public CuratorDruidCoordinatorTest() + { + jsonMapper = TestHelper.makeJsonMapper(); + zkPathsConfig = new ZkPathsConfig(); + } + + @Before + public void setUp() throws Exception + { + databaseSegmentManager = EasyMock.createNiceMock(MetadataSegmentManager.class); + metadataRuleManager = EasyMock.createNiceMock(MetadataRuleManager.class); + configManager = EasyMock.createNiceMock(JacksonConfigManager.class); + EasyMock.expect( + configManager.watch( + EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY), + EasyMock.anyObject(Class.class), + EasyMock.anyObject() + ) + ).andReturn(new AtomicReference(CoordinatorDynamicConfig.builder().build())).anyTimes(); + EasyMock.expect( + configManager.watch( + EasyMock.eq(CoordinatorCompactionConfig.CONFIG_KEY), + EasyMock.anyObject(Class.class), + EasyMock.anyObject() + ) + ).andReturn(new AtomicReference(CoordinatorCompactionConfig.empty())).anyTimes(); + EasyMock.replay(configManager); + + setupServerAndCurator(); + curator.start(); + curator.blockUntilConnected(); + curator.create().creatingParentsIfNeeded().forPath(SEGPATH); + curator.create().creatingParentsIfNeeded().forPath(SOURCE_LOAD_PATH); + curator.create().creatingParentsIfNeeded().forPath(DESTINATION_LOAD_PATH); + + objectMapper = new DefaultObjectMapper(); + druidCoordinatorConfig = new TestDruidCoordinatorConfig( + new Duration(COORDINATOR_START_DELAY), + new Duration(COORDINATOR_PERIOD), + null, + null, + new Duration(COORDINATOR_PERIOD), + null, + 10, + null, + false, + false, + new Duration("PT0s") + ); + sourceLoadQueueChildrenCache = new PathChildrenCache( + curator, + SOURCE_LOAD_PATH, + true, + true, + Execs.singleThreaded("coordinator_test_path_children_cache_src-%d") + ); + destinationLoadQueueChildrenCache = new PathChildrenCache( + curator, + DESTINATION_LOAD_PATH, + true, + true, + Execs.singleThreaded("coordinator_test_path_children_cache_dest-%d") + ); + sourceLoadQueuePeon = new CuratorLoadQueuePeon( + curator, + SOURCE_LOAD_PATH, + objectMapper, + Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_src_scheduled-%d"), + Execs.singleThreaded("coordinator_test_load_queue_peon_src-%d"), + druidCoordinatorConfig + ); + destinationLoadQueuePeon = new CuratorLoadQueuePeon( + curator, + DESTINATION_LOAD_PATH, + objectMapper, + Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_dest_scheduled-%d"), + Execs.singleThreaded("coordinator_test_load_queue_peon_dest-%d"), + druidCoordinatorConfig + ); + druidNode = new DruidNode("hey", "what", 1234, null, true, false); + loadManagementPeons = new ConcurrentHashMap<>(); + scheduledExecutorFactory = (corePoolSize, nameFormat) -> Executors.newSingleThreadScheduledExecutor(); + leaderAnnouncerLatch = new CountDownLatch(1); + leaderUnannouncerLatch = new CountDownLatch(1); + coordinator = new DruidCoordinator( + druidCoordinatorConfig, + new ZkPathsConfig() + { + + @Override + public String getBase() + { + return "druid"; + } + }, + configManager, + databaseSegmentManager, + baseView, + metadataRuleManager, + curator, + new NoopServiceEmitter(), + scheduledExecutorFactory, + null, + null, + new NoopServiceAnnouncer() + { + @Override + public void announce(DruidNode node) + { + // count down when this coordinator becomes the leader + leaderAnnouncerLatch.countDown(); + } + + @Override + public void unannounce(DruidNode node) + { + leaderUnannouncerLatch.countDown(); + } + }, + druidNode, + loadManagementPeons, + null, + new CostBalancerStrategyFactory(), + EasyMock.createNiceMock(LookupCoordinatorManager.class), + new TestDruidLeaderSelector() + ); + } + + @After + public void tearDown() throws Exception + { + baseView.stop(); + sourceLoadQueuePeon.stop(); + sourceLoadQueueChildrenCache.close(); + destinationLoadQueueChildrenCache.close(); + tearDownServerAndCurator(); + } + + @Test(timeout = 5_000) + public void testMoveSegment() throws Exception + { + segmentViewInitLatch = new CountDownLatch(1); + segmentAddedLatch = new CountDownLatch(4); + + segmentRemovedLatch = new CountDownLatch(0); + + CountDownLatch destCountdown = new CountDownLatch(1); + CountDownLatch srcCountdown = new CountDownLatch(1); + setupView(); + + DruidServer source = new DruidServer( + "localhost:1", + "localhost:1", + null, + 10000000L, + ServerType.HISTORICAL, + "default_tier", + 0 + ); + + DruidServer dest = new DruidServer( + "localhost:2", + "localhost:2", + null, + 10000000L, + ServerType.HISTORICAL, + "default_tier", + 0 + ); + + setupZNodeForServer(source, zkPathsConfig, jsonMapper); + setupZNodeForServer(dest, zkPathsConfig, jsonMapper); + + final List sourceSegments = Lists.transform( + ImmutableList.of( + Pair.of("2011-04-01/2011-04-03", "v1"), + Pair.of("2011-04-03/2011-04-06", "v1"), + Pair.of("2011-04-06/2011-04-09", "v1") + ), + input -> dataSegmentWithIntervalAndVersion(input.lhs, input.rhs) + ); + + final List destinationSegments = Lists.transform( + ImmutableList.of( + Pair.of("2011-03-31/2011-04-01", "v1") + ), + input -> dataSegmentWithIntervalAndVersion(input.lhs, input.rhs) + ); + + DataSegment segmentToMove = sourceSegments.get(2); + + List sourceSegKeys = Lists.newArrayList(); + List destSegKeys = Lists.newArrayList(); + + for (DataSegment segment : sourceSegments) { + sourceSegKeys.add(announceBatchSegmentsForServer(source, ImmutableSet.of(segment), zkPathsConfig, jsonMapper)); + } + + for (DataSegment segment : destinationSegments) { + destSegKeys.add(announceBatchSegmentsForServer(dest, ImmutableSet.of(segment), zkPathsConfig, jsonMapper)); + } + + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); + + // these child watchers are used to simulate actions of historicals, announcing a segment on noticing a load queue + // for the destination and unannouncing from source server when noticing a drop request + + sourceLoadQueueChildrenCache.getListenable().addListener( + (curatorFramework, pathChildrenCacheEvent) -> { + if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) { + srcCountdown.countDown(); + } else if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { + //Simulate source server dropping segment + unannounceSegmentFromBatchForServer(source, segmentToMove, sourceSegKeys.get(2), zkPathsConfig); + } + } + ); + + destinationLoadQueueChildrenCache.getListenable().addListener( + (curatorFramework, pathChildrenCacheEvent) -> { + if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) { + destCountdown.countDown(); + } else if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { + //Simulate destination server loading segment + announceBatchSegmentsForServer(dest, ImmutableSet.of(segmentToMove), zkPathsConfig, jsonMapper); + } + } + ); + + sourceLoadQueueChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); + destinationLoadQueueChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); + + Assert.assertTrue(timing.forWaiting().awaitLatch(srcCountdown)); + Assert.assertTrue(timing.forWaiting().awaitLatch(destCountdown)); + + + loadManagementPeons.put("localhost:1", sourceLoadQueuePeon); + loadManagementPeons.put("localhost:2", destinationLoadQueuePeon); + + + segmentRemovedLatch = new CountDownLatch(1); + segmentAddedLatch = new CountDownLatch(1); + + ImmutableDruidDataSource druidDataSource = EasyMock.createNiceMock(ImmutableDruidDataSource.class); + EasyMock.expect(druidDataSource.getSegment(EasyMock.anyString())).andReturn(sourceSegments.get(2)); + EasyMock.replay(druidDataSource); + EasyMock.expect(databaseSegmentManager.getInventoryValue(EasyMock.anyString())).andReturn(druidDataSource); + EasyMock.replay(databaseSegmentManager); + + coordinator.moveSegment( + source.toImmutableDruidServer(), + dest.toImmutableDruidServer(), + sourceSegments.get(2), + null + ); + + // wait for destination server to load segment + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); + + // remove load queue key from destination server to trigger adding drop to load queue + curator.delete().guaranteed().forPath(ZKPaths.makePath(DESTINATION_LOAD_PATH, segmentToMove.getIdentifier())); + + // wait for drop + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); + + // clean up drop from load queue + curator.delete().guaranteed().forPath(ZKPaths.makePath(SOURCE_LOAD_PATH, segmentToMove.getIdentifier())); + + List servers = serverView.getInventory().stream().collect(Collectors.toList()); + + Assert.assertEquals(2, servers.get(0).getSegments().size()); + Assert.assertEquals(2, servers.get(1).getSegments().size()); + } + + private static class TestDruidLeaderSelector implements DruidLeaderSelector + { + private volatile Listener listener; + private volatile String leader; + + @Override + public String getCurrentLeader() + { + return leader; + } + + @Override + public boolean isLeader() + { + return leader != null; + } + + @Override + public int localTerm() + { + return 0; + } + + @Override + public void registerListener(Listener listener) + { + this.listener = listener; + leader = "what:1234"; + listener.becomeLeader(); + } + + @Override + public void unregisterListener() + { + leader = null; + listener.stopBeingLeader(); + } + } + + private void setupView() throws Exception + { + baseView = new BatchServerInventoryView( + zkPathsConfig, + curator, + jsonMapper, + Predicates.alwaysTrue() + ) + { + @Override + public void registerSegmentCallback(Executor exec, final SegmentCallback callback) + { + super.registerSegmentCallback( + exec, new SegmentCallback() + { + @Override + public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) + { + CallbackAction res = callback.segmentAdded(server, segment); + segmentAddedLatch.countDown(); + return res; + } + + @Override + public CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment) + { + CallbackAction res = callback.segmentRemoved(server, segment); + segmentRemovedLatch.countDown(); + return res; + } + + @Override + public CallbackAction segmentViewInitialized() + { + CallbackAction res = callback.segmentViewInitialized(); + segmentViewInitLatch.countDown(); + return res; + } + } + ); + } + }; + + serverView = new CoordinatorServerView(baseView); + + baseView.start(); + + sourceLoadQueuePeon.start(); + destinationLoadQueuePeon.start(); + + coordinator = new DruidCoordinator( + druidCoordinatorConfig, + new ZkPathsConfig() + { + + @Override + public String getBase() + { + return "druid"; + } + }, + configManager, + databaseSegmentManager, + baseView, + metadataRuleManager, + curator, + new NoopServiceEmitter(), + scheduledExecutorFactory, + null, + null, + new NoopServiceAnnouncer() + { + @Override + public void announce(DruidNode node) + { + // count down when this coordinator becomes the leader + leaderAnnouncerLatch.countDown(); + } + + @Override + public void unannounce(DruidNode node) + { + leaderUnannouncerLatch.countDown(); + } + }, + druidNode, + loadManagementPeons, + null, + new CostBalancerStrategyFactory(), + EasyMock.createNiceMock(LookupCoordinatorManager.class), + new TestDruidLeaderSelector() + ); + } + + private DataSegment dataSegmentWithIntervalAndVersion(String intervalStr, String version) + { + return DataSegment.builder() + .dataSource("test_curator_druid_coordinator") + .interval(Intervals.of(intervalStr)) + .loadSpec( + ImmutableMap.of( + "type", + "local", + "path", + "somewhere" + ) + ) + .version(version) + .dimensions(ImmutableList.of()) + .metrics(ImmutableList.of()) + .shardSpec(NoneShardSpec.instance()) + .binaryVersion(9) + .size(0) + .build(); + } +}