Skip to content

Commit

Permalink
Coordinator balancer move then drop fix (apache#5528)
Browse files Browse the repository at this point in the history
* apache#5521 part 1

* formatting

* oops

* less magic tests
  • Loading branch information
clintropolis authored and jon-wei committed Apr 4, 2018
1 parent d51306c commit dc8ee08
Show file tree
Hide file tree
Showing 6 changed files with 674 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,16 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.curator.inventory.CuratorInventoryManager;
import io.druid.curator.inventory.CuratorInventoryManagerStrategy;
import io.druid.curator.inventory.InventoryManagerConfig;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;

import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -157,14 +156,7 @@ public void inventoryInitialized()
{
log.info("Inventory Initialized");
runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentViewInitialized();
}
}
input -> input.segmentViewInitialized()
);
}
}
Expand Down Expand Up @@ -233,15 +225,10 @@ protected void runSegmentCallbacks(
{
for (final Map.Entry<SegmentCallback, Executor> entry : segmentCallbacks.entrySet()) {
entry.getValue().execute(
new Runnable()
{
@Override
public void run()
{
if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) {
segmentCallbackRemoved(entry.getKey());
segmentCallbacks.remove(entry.getKey());
}
() -> {
if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) {
segmentCallbackRemoved(entry.getKey());
segmentCallbacks.remove(entry.getKey());
}
}
);
Expand All @@ -252,14 +239,9 @@ private void runServerRemovedCallbacks(final DruidServer server)
{
for (final Map.Entry<ServerRemovedCallback, Executor> entry : serverRemovedCallbacks.entrySet()) {
entry.getValue().execute(
new Runnable()
{
@Override
public void run()
{
if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) {
serverRemovedCallbacks.remove(entry.getKey());
}
() -> {
if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) {
serverRemovedCallbacks.remove(entry.getKey());
}
}
);
Expand All @@ -286,14 +268,7 @@ protected void addSingleInventory(
container.addDataSegment(inventory);

runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentAdded(container.getMetadata(), inventory);
}
}
input -> input.segmentAdded(container.getMetadata(), inventory)
);
}

Expand All @@ -315,26 +290,16 @@ protected void removeSingleInventory(final DruidServer container, String invento
container.removeDataSegment(inventoryKey);

runSegmentCallbacks(
new Function<SegmentCallback, CallbackAction>()
{
@Override
public CallbackAction apply(SegmentCallback input)
{
return input.segmentRemoved(container.getMetadata(), segment);
}
}
input -> input.segmentRemoved(container.getMetadata(), segment)
);
}

@Override
public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment)
{
try {
String toServedSegPath = ZKPaths.makePath(
ZKPaths.makePath(getInventoryManagerConfig().getInventoryPath(), serverKey),
segment.getIdentifier()
);
return curator.checkExists().forPath(toServedSegPath) != null;
DruidServer server = getInventoryValue(serverKey);
return server != null && server.getSegment(segment.getIdentifier()) != null;
}
catch (Exception ex) {
throw Throwables.propagate(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,12 @@
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.Stat;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -249,7 +247,7 @@ private void processSegmentChangeRequest()
if (currentlyProcessing == null) {
if (!stopped) {
log.makeAlert("Crazy race condition! server[%s]", basePath)
.emit();
.emit();
}
actionCompleted();
return;
Expand All @@ -261,38 +259,28 @@ private void processSegmentChangeRequest()
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);

processingExecutor.schedule(
new Runnable()
{
@Override
public void run()
{
try {
if (curator.checkExists().forPath(path) != null) {
failAssign(new ISE("%s was never removed! Failing this operation!", path));
}
}
catch (Exception e) {
failAssign(e);
() -> {
try {
if (curator.checkExists().forPath(path) != null) {
failAssign(new ISE("%s was never removed! Failing this operation!", path));
}
}
catch (Exception e) {
failAssign(e);
}
},
config.getLoadTimeoutDelay().getMillis(),
TimeUnit.MILLISECONDS
);

final Stat stat = curator.checkExists().usingWatcher(
new CuratorWatcher()
{
@Override
public void process(WatchedEvent watchedEvent) throws Exception
{
switch (watchedEvent.getType()) {
case NodeDeleted:
entryRemoved(watchedEvent.getPath());
break;
default:
// do nothing
}
(CuratorWatcher) watchedEvent -> {
switch (watchedEvent.getType()) {
case NodeDeleted:
entryRemoved(watchedEvent.getPath());
break;
default:
// do nothing
}
}
).forPath(path);
Expand Down Expand Up @@ -341,14 +329,7 @@ private void actionCompleted()
final List<LoadPeonCallback> callbacks = currentlyProcessing.getCallbacks();
currentlyProcessing = null;
callBackExecutor.execute(
new Runnable()
{
@Override
public void run()
{
executeCallbacks(callbacks);
}
}
() -> executeCallbacks(callbacks)
);
}
}
Expand All @@ -360,18 +341,13 @@ public void start()
processingExecutor,
config.getLoadQueuePeonRepeatDelay(),
config.getLoadQueuePeonRepeatDelay(),
new Callable<ScheduledExecutors.Signal>()
{
@Override
public ScheduledExecutors.Signal call()
{
processSegmentChangeRequest();

if (stopped) {
return ScheduledExecutors.Signal.STOP;
} else {
return ScheduledExecutors.Signal.REPEAT;
}
() -> {
processSegmentChangeRequest();

if (stopped) {
return ScheduledExecutors.Signal.STOP;
} else {
return ScheduledExecutors.Signal.REPEAT;
}
}
);
Expand Down

0 comments on commit dc8ee08

Please sign in to comment.