Skip to content

Commit

Permalink
Add simple primary-backup tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Nov 29, 2017
1 parent 971216a commit 66e2b09
Show file tree
Hide file tree
Showing 14 changed files with 1,130 additions and 17 deletions.
Expand Up @@ -149,13 +149,13 @@ public Builder withReplication(Replication replication) {
* @return the protocol builder * @return the protocol builder
*/ */
public Builder withBackups(int numBackups) { public Builder withBackups(int numBackups) {
checkArgument(numBackups > 0 || numBackups == -1, "numBackups must be positive"); checkArgument(numBackups >= 0, "numBackups must be positive");
this.numBackups = numBackups; this.numBackups = numBackups;
return this; return this;
} }


@Override @Override
public PrimitiveProtocol build() { public MultiPrimaryProtocol build() {
return new MultiPrimaryProtocol(group, consistency, replication, numBackups); return new MultiPrimaryProtocol(group, consistency, replication, numBackups);
} }
} }
Expand Down
Expand Up @@ -76,6 +76,15 @@ public PrimaryBackupServer(PrimaryBackupServerContext context) {
this.context = checkNotNull(context, "context cannot be null"); this.context = checkNotNull(context, "context cannot be null");
} }


/**
* Returns the current server role.
*
* @return the current server role
*/
public Role getRole() {
return context.getRole();
}

@Override @Override
public CompletableFuture<PrimaryBackupServer> open() { public CompletableFuture<PrimaryBackupServer> open() {
context.open(); context.open();
Expand Down
Expand Up @@ -20,8 +20,11 @@
import io.atomix.primitive.PrimitiveId; import io.atomix.primitive.PrimitiveId;
import io.atomix.primitive.PrimitiveTypeRegistry; import io.atomix.primitive.PrimitiveTypeRegistry;
import io.atomix.primitive.partition.PrimaryElection; import io.atomix.primitive.partition.PrimaryElection;
import io.atomix.protocols.backup.PrimaryBackupServer.Role;
import io.atomix.protocols.backup.protocol.BackupRequest; import io.atomix.protocols.backup.protocol.BackupRequest;
import io.atomix.protocols.backup.protocol.BackupResponse; import io.atomix.protocols.backup.protocol.BackupResponse;
import io.atomix.protocols.backup.protocol.CloseRequest;
import io.atomix.protocols.backup.protocol.CloseResponse;
import io.atomix.protocols.backup.protocol.ExecuteRequest; import io.atomix.protocols.backup.protocol.ExecuteRequest;
import io.atomix.protocols.backup.protocol.ExecuteResponse; import io.atomix.protocols.backup.protocol.ExecuteResponse;
import io.atomix.protocols.backup.protocol.MetadataRequest; import io.atomix.protocols.backup.protocol.MetadataRequest;
Expand All @@ -34,6 +37,7 @@
import io.atomix.utils.concurrent.ThreadContextFactory; import io.atomix.utils.concurrent.ThreadContextFactory;


import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors; import java.util.stream.Collectors;


Expand Down Expand Up @@ -64,6 +68,17 @@ public PrimaryBackupServerContext(
this.primaryElection = primaryElection; this.primaryElection = primaryElection;
} }


/**
* Returns the current server role.
*
* @return the current server role
*/
public Role getRole() {
return Objects.equals(primaryElection.getTerm().join().primary(), clusterService.getLocalNode().id())
? Role.PRIMARY
: Role.BACKUP;
}

/** /**
* Opens the server context. * Opens the server context.
*/ */
Expand Down Expand Up @@ -93,6 +108,13 @@ private CompletableFuture<RestoreResponse> restore(RestoreRequest request) {
return getService(request).thenCompose(service -> service.restore(request)); return getService(request).thenCompose(service -> service.restore(request));
} }


/**
* Handles a close request.
*/
private CompletableFuture<CloseResponse> close(CloseRequest request) {
return getService(request).thenCompose(service -> service.close(request));
}

/** /**
* Returns the service context for the given request. * Returns the service context for the given request.
*/ */
Expand Down Expand Up @@ -128,6 +150,7 @@ private void registerListeners() {
protocol.registerExecuteHandler(this::execute); protocol.registerExecuteHandler(this::execute);
protocol.registerBackupHandler(this::backup); protocol.registerBackupHandler(this::backup);
protocol.registerRestoreHandler(this::restore); protocol.registerRestoreHandler(this::restore);
protocol.registerCloseHandler(this::close);
protocol.registerMetadataHandler(this::metadata); protocol.registerMetadataHandler(this::metadata);
} }


Expand All @@ -138,6 +161,7 @@ private void unregisterListeners() {
protocol.unregisterExecuteHandler(); protocol.unregisterExecuteHandler();
protocol.unregisterBackupHandler(); protocol.unregisterBackupHandler();
protocol.unregisterRestoreHandler(); protocol.unregisterRestoreHandler();
protocol.unregisterCloseHandler();
protocol.unregisterMetadataHandler(); protocol.unregisterMetadataHandler();
} }


Expand Down
Expand Up @@ -102,7 +102,7 @@ public void removeListener(SessionEventListener listener) {
public void publish(PrimitiveEvent event) { public void publish(PrimitiveEvent event) {
if (context.getRole() == Role.PRIMARY) { if (context.getRole() == Role.PRIMARY) {
context.threadContext().execute(() -> { context.threadContext().execute(() -> {
log.trace("Sending {} to {}", event); log.trace("Sending {} to {}", event, nodeId);
context.protocol().event(nodeId, sessionId, event); context.protocol().event(nodeId, sessionId, event);
}); });
} }
Expand Down
Expand Up @@ -46,6 +46,10 @@ class SynchronousReplicator implements Replicator {


@Override @Override
public CompletableFuture<Void> replicate(BackupOperation operation) { public CompletableFuture<Void> replicate(BackupOperation operation) {
if (context.descriptor().backups() == 0) {
return CompletableFuture.completedFuture(null);
}

CompletableFuture<Void> future = new CompletableFuture<>(); CompletableFuture<Void> future = new CompletableFuture<>();
futures.put(operation.index(), future); futures.put(operation.index(), future);
for (NodeId backup : context.backups()) { for (NodeId backup : context.backups()) {
Expand Down
Expand Up @@ -46,19 +46,16 @@
import io.atomix.protocols.backup.roles.NoneRole; import io.atomix.protocols.backup.roles.NoneRole;
import io.atomix.protocols.backup.roles.PrimaryBackupRole; import io.atomix.protocols.backup.roles.PrimaryBackupRole;
import io.atomix.protocols.backup.roles.PrimaryRole; import io.atomix.protocols.backup.roles.PrimaryRole;
import io.atomix.protocols.backup.serializer.impl.PrimaryBackupSerializers;
import io.atomix.utils.concurrent.ComposableFuture; import io.atomix.utils.concurrent.ComposableFuture;
import io.atomix.utils.concurrent.ThreadContext; import io.atomix.utils.concurrent.ThreadContext;
import io.atomix.utils.logging.ContextualLoggerFactory; import io.atomix.utils.logging.ContextualLoggerFactory;
import io.atomix.utils.logging.LoggerContext; import io.atomix.utils.logging.LoggerContext;
import io.atomix.utils.serializer.Serializer;
import io.atomix.utils.time.LogicalClock; import io.atomix.utils.time.LogicalClock;
import io.atomix.utils.time.LogicalTimestamp; import io.atomix.utils.time.LogicalTimestamp;
import io.atomix.utils.time.WallClock; import io.atomix.utils.time.WallClock;
import io.atomix.utils.time.WallClockTimestamp; import io.atomix.utils.time.WallClockTimestamp;
import org.slf4j.Logger; import org.slf4j.Logger;


import java.time.Duration;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;


Expand All @@ -68,9 +65,6 @@
* Raft server state machine executor. * Raft server state machine executor.
*/ */
public class PrimaryBackupServiceContext implements ServiceContext { public class PrimaryBackupServiceContext implements ServiceContext {
private static final Serializer SERIALIZER = PrimaryBackupSerializers.PROTOCOL;
private static final Duration HEARTBEAT_INTERVAL = Duration.ofSeconds(1);

private final Logger log; private final Logger log;
private final NodeId localNodeId; private final NodeId localNodeId;
private final String serverName; private final String serverName;
Expand Down Expand Up @@ -443,13 +437,17 @@ public CompletableFuture<CloseResponse> close(CloseRequest request) {
ComposableFuture<CloseResponse> future = new ComposableFuture<>(); ComposableFuture<CloseResponse> future = new ComposableFuture<>();
threadContext.execute(() -> { threadContext.execute(() -> {
PrimaryBackupSession session = sessions.getSession(request.session()); PrimaryBackupSession session = sessions.getSession(request.session());
role.close(session).whenComplete((result, error) -> { if (session != null) {
if (error == null) { role.close(session).whenComplete((result, error) -> {
future.complete(CloseResponse.ok()); if (error == null) {
} else { future.complete(CloseResponse.ok());
future.complete(CloseResponse.error()); } else {
} future.complete(CloseResponse.error());
}); }
});
} else {
future.complete(CloseResponse.error());
}
}); });
return future; return future;
} }
Expand Down Expand Up @@ -515,7 +513,18 @@ private void changeRole(PrimaryTerm term) {
primary = term.primary(); primary = term.primary();
backups = term.backups().subList(0, Math.min(descriptor.backups(), term.backups().size())); backups = term.backups().subList(0, Math.min(descriptor.backups(), term.backups().size()));


if (primary.equals(clusterService.getLocalNode().id())) { if (backups.size() < descriptor.backups()) {
if (this.role == null) {
log.warn("Not enough backups; transitioning to {}", Role.NONE);
this.role = new NoneRole(this);
log.trace("{} transitioning to {}", clusterService.getLocalNode().id(), Role.NONE);
} else if (this.role.role() != Role.NONE) {
log.warn("Not enough backups; transitioning to {}", Role.NONE);
this.role.close();
this.role = new NoneRole(this);
log.trace("{} transitioning to {}", clusterService.getLocalNode().id(), Role.NONE);
}
} else if (primary.equals(clusterService.getLocalNode().id())) {
if (this.role == null) { if (this.role == null) {
this.role = new PrimaryRole(this); this.role = new PrimaryRole(this);
log.trace("{} transitioning to {}", clusterService.getLocalNode().id(), Role.PRIMARY); log.trace("{} transitioning to {}", clusterService.getLocalNode().id(), Role.PRIMARY);
Expand Down
@@ -0,0 +1,68 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atomix.cluster;

import io.atomix.messaging.Endpoint;

import java.util.Collection;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Test cluster service.
*/
public class TestClusterService implements ClusterService {
private final NodeId localNode;
private final Collection<NodeId> nodes;

public TestClusterService(NodeId localNode, Collection<NodeId> nodes) {
this.localNode = localNode;
this.nodes = nodes;
}

@Override
public Node getLocalNode() {
return Node.builder()
.withId(localNode)
.withEndpoint(Endpoint.from("localhost", localNode.hashCode()))
.build();
}

@Override
public Set<Node> getNodes() {
return nodes.stream()
.map(node -> Node.builder()
.withId(node)
.withEndpoint(Endpoint.from("localhost", node.hashCode()))
.build())
.collect(Collectors.toSet());
}

@Override
public Node getNode(NodeId nodeId) {
return null;
}

@Override
public void addListener(ClusterEventListener listener) {

}

@Override
public void removeListener(ClusterEventListener listener) {

}
}
@@ -0,0 +1,72 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atomix.primitive.partition;

import com.google.common.collect.Sets;
import io.atomix.cluster.NodeId;
import io.atomix.primitive.partition.PrimaryElectionEvent.Type;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

/**
* Test primary election.
*/
public class TestPrimaryElection implements PrimaryElection {
private long counter;
private PrimaryTerm term;
private final List<NodeId> candidates = new ArrayList<>();
private final Set<PrimaryElectionEventListener> listeners = Sets.newConcurrentHashSet();

@Override
public CompletableFuture<PrimaryTerm> enter(NodeId nodeId) {
candidates.add(nodeId);
if (term == null) {
term = new PrimaryTerm(++counter, nodeId, Collections.emptyList());
listeners.forEach(l -> l.onEvent(new PrimaryElectionEvent(Type.PRIMARY_AND_BACKUPS_CHANGED, term)));
} else {
term = new PrimaryTerm(term.term(), term.primary(), candidates.stream()
.filter(candidate -> !candidate.equals(term.primary()))
.collect(Collectors.toList()));
listeners.forEach(l -> l.onEvent(new PrimaryElectionEvent(Type.BACKUPS_CHANGED, term)));
}
return CompletableFuture.completedFuture(term);
}

@Override
public CompletableFuture<PrimaryTerm> getTerm() {
return CompletableFuture.completedFuture(term);
}

@Override
public void addListener(PrimaryElectionEventListener listener) {
listeners.add(listener);
}

@Override
public void removeListener(PrimaryElectionEventListener listener) {
listeners.remove(listener);
}

@Override
public void close() {

}
}

0 comments on commit 66e2b09

Please sign in to comment.