Skip to content

Commit

Permalink
Ensure tryLock attempts are rejected during network partitions.
Browse files Browse the repository at this point in the history
- Use client-side timers for tryLock with timeout
- Ensure lock sessions are properly cleaned up when timed out by clients
  • Loading branch information
kuujo committed Apr 19, 2018
1 parent 7143c50 commit 11c2fb8
Show file tree
Hide file tree
Showing 9 changed files with 324 additions and 72 deletions.
15 changes: 13 additions & 2 deletions core/src/main/java/io/atomix/core/Atomix.java
Expand Up @@ -78,6 +78,7 @@
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.concurrent.SingleThreadContext;
import io.atomix.utils.concurrent.ThreadContext;
import io.atomix.utils.concurrent.Threads;
import io.atomix.utils.net.Address;
import io.atomix.utils.net.MalformedAddressException;
import org.slf4j.Logger;
Expand All @@ -89,6 +90,8 @@
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -423,6 +426,7 @@ private synchronized CompletableFuture<Void> doStop() {
.thenComposeAsync(v -> context.messagingService.stop(), threadContext)
.exceptionally(e -> null)
.thenRunAsync(() -> {
context.executorService.shutdownNow();
threadContext.close();
started.set(false);
LOGGER.info("Stopped");
Expand Down Expand Up @@ -474,6 +478,7 @@ private static AtomixConfig loadConfig(File config) {
* Builds a context from the given configuration.
*/
private static Context buildContext(AtomixConfig config) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), Threads.namedThreads("atomix-primitive-%d", LOGGER));
ManagedMessagingService messagingService = buildMessagingService(config);
ManagedBroadcastService broadcastService = buildBroadcastService(config);
ManagedBootstrapMetadataService bootstrapMetadataService = buildBootstrapMetadataService(config);
Expand All @@ -484,9 +489,10 @@ private static Context buildContext(AtomixConfig config) {
ManagedPartitionGroup systemPartitionGroup = buildSystemPartitionGroup(config);
ManagedPartitionService partitions = buildPartitionService(config);
ManagedPrimitivesService primitives = new CorePrimitivesService(
clusterService, clusterMessagingService, clusterEventingService, partitions, systemPartitionGroup, config);
executorService, clusterService, clusterMessagingService, clusterEventingService, partitions, systemPartitionGroup, config);
PrimitiveTypeRegistry primitiveTypes = new PrimitiveTypeRegistry(config.getPrimitiveTypes());
return new Context(
executorService,
messagingService,
broadcastService,
bootstrapMetadataService,
Expand Down Expand Up @@ -618,6 +624,7 @@ private static ManagedPartitionService buildPartitionService(AtomixConfig config
* Atomix instance context.
*/
private static class Context {
private final ScheduledExecutorService executorService;
private final ManagedMessagingService messagingService;
private final ManagedBroadcastService broadcastService;
private final ManagedBootstrapMetadataService bootstrapMetadataService;
Expand All @@ -632,6 +639,7 @@ private static class Context {
private final boolean enableShutdownHook;

public Context(
ScheduledExecutorService executorService,
ManagedMessagingService messagingService,
ManagedBroadcastService broadcastService,
ManagedBootstrapMetadataService bootstrapMetadataService,
Expand All @@ -644,6 +652,7 @@ public Context(
ManagedPrimitivesService primitives,
PrimitiveTypeRegistry primitiveTypes,
boolean enableShutdownHook) {
this.executorService = executorService;
this.messagingService = messagingService;
this.broadcastService = broadcastService;
this.bootstrapMetadataService = bootstrapMetadataService;
Expand Down Expand Up @@ -872,6 +881,7 @@ public Builder addPrimitiveType(PrimitiveType primitiveType) {
*/
@Override
public Atomix build() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), Threads.namedThreads("atomix-primitive-%d", LOGGER));
ManagedMessagingService messagingService = buildMessagingService();
ManagedBroadcastService broadcastService = buildBroadcastService();
ManagedBootstrapMetadataService bootstrapMetadataService = buildBootstrapMetadataService();
Expand All @@ -882,8 +892,9 @@ public Atomix build() {
ManagedPartitionGroup systemPartitionGroup = buildSystemPartitionGroup();
ManagedPartitionService partitions = buildPartitionService();
ManagedPrimitivesService primitives = new CorePrimitivesService(
clusterService, clusterMessagingService, clusterEventingService, partitions, systemPartitionGroup, new AtomixConfig());
executorService, clusterService, clusterMessagingService, clusterEventingService, partitions, systemPartitionGroup, new AtomixConfig());
return new Atomix(new Context(
executorService,
messagingService,
broadcastService,
bootstrapMetadataService,
Expand Down
Expand Up @@ -22,29 +22,39 @@
import io.atomix.primitive.PrimitiveRegistry;
import io.atomix.primitive.partition.PartitionService;

import java.util.concurrent.ScheduledExecutorService;

/**
* Default primitive management service.
*/
public class CorePrimitiveManagementService implements PrimitiveManagementService {
private final ScheduledExecutorService executorService;
private final ClusterService clusterService;
private final ClusterMessagingService communicationService;
private final ClusterEventingService eventService;
private final PartitionService partitionService;
private final PrimitiveRegistry primitiveRegistry;

public CorePrimitiveManagementService(
ScheduledExecutorService executorService,
ClusterService clusterService,
ClusterMessagingService communicationService,
ClusterEventingService eventService,
PartitionService partitionService,
PrimitiveRegistry primitiveRegistry) {
this.executorService = executorService;
this.clusterService = clusterService;
this.communicationService = communicationService;
this.eventService = eventService;
this.partitionService = partitionService;
this.primitiveRegistry = primitiveRegistry;
}

@Override
public ScheduledExecutorService getExecutorService() {
return executorService;
}

@Override
public ClusterService getClusterService() {
return clusterService;
Expand Down
Expand Up @@ -67,6 +67,7 @@
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.google.common.base.Preconditions.checkNotNull;
Expand All @@ -87,6 +88,7 @@ public class CorePrimitivesService implements ManagedPrimitivesService {
private final AtomicBoolean started = new AtomicBoolean();

public CorePrimitivesService(
ScheduledExecutorService executorService,
ClusterService clusterService,
ClusterMessagingService communicationService,
ClusterEventingService eventService,
Expand All @@ -95,6 +97,7 @@ public CorePrimitivesService(
AtomixConfig config) {
this.primitiveRegistry = new CorePrimitiveRegistry(systemPartitionGroup);
this.managementService = new CorePrimitiveManagementService(
executorService,
clusterService,
communicationService,
eventService,
Expand Down
Expand Up @@ -23,8 +23,8 @@
* Raft value events.
*/
public enum DistributedLockEvents implements EventType {
LOCK("lock"),
FAIL("fail");
LOCKED("lock"),
FAILED("fail");

private final String id;

Expand Down

0 comments on commit 11c2fb8

Please sign in to comment.