Skip to content

Commit

Permalink
Add service name/type identifiers.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jun 29, 2017
1 parent 31dd13b commit 5fa58b6
Show file tree
Hide file tree
Showing 39 changed files with 497 additions and 297 deletions.
Expand Up @@ -35,6 +35,7 @@
import io.atomix.protocols.raft.protocol.QueryResponse;
import io.atomix.protocols.raft.protocol.RaftClientProtocol;
import io.atomix.protocols.raft.protocol.ResetRequest;
import io.atomix.protocols.raft.session.SessionId;
import io.atomix.serializer.Serializer;

import java.util.Collection;
Expand Down Expand Up @@ -103,12 +104,12 @@ public void reset(Collection<MemberId> members, ResetRequest request) {
}

@Override
public void registerPublishListener(long sessionId, Consumer<PublishRequest> listener, Executor executor) {
clusterCommunicator.addSubscriber(context.publishSubject(sessionId), serializer::decode, listener, executor);
public void registerPublishListener(SessionId sessionId, Consumer<PublishRequest> listener, Executor executor) {
clusterCommunicator.addSubscriber(context.publishSubject(sessionId.id()), serializer::decode, listener, executor);
}

@Override
public void unregisterPublishListener(long sessionId) {
clusterCommunicator.removeSubscriber(context.publishSubject(sessionId));
public void unregisterPublishListener(SessionId sessionId) {
clusterCommunicator.removeSubscriber(context.publishSubject(sessionId.id()));
}
}
Expand Up @@ -59,21 +59,10 @@
* not be deterministic.
*
* @see RaftStateMachine
* @see StateMachineContext
* @see ServiceContext
*/
public interface RaftOperationExecutor extends ThreadContext {

/**
* Returns the state machine context.
* <p>
* The context is reflective of the current position and state of the Raft state machine. In particular,
* it exposes the current approximate {@link StateMachineContext#wallClock() time} and all open
* {@link io.atomix.protocols.raft.session.RaftSessions}.
*
* @return The state machine context.
*/
StateMachineContext getContext();

/**
* Applies the given commit to the executor.
*
Expand Down
Expand Up @@ -27,13 +27,13 @@
* Raft service.
*/
public abstract class RaftService implements RaftStateMachine {
private StateMachineContext context;
private ServiceContext context;
private RaftOperationExecutor executor;

@Override
public void init(StateMachineContext context) {
public void init(ServiceContext context) {
this.context = context;
this.executor = new DefaultRaftOperationExecutor(context);
this.executor = new DefaultRaftOperationExecutor();
configure(executor);
}

Expand All @@ -53,6 +53,15 @@ public byte[] apply(RaftCommit<byte[]> commit) {
*/
protected abstract void configure(RaftOperationExecutor executor);

/**
* Returns the service context.
*
* @return the service context
*/
protected ServiceContext getContext() {
return context;
}

/**
* Returns the state machine scheduler.
*
Expand All @@ -76,8 +85,8 @@ protected StateMachineId getStateMachineId() {
*
* @return The unique state machine name.
*/
protected String getStateMachineName() {
return context.name();
protected ServiceName getServiceName() {
return context.serviceName();
}

/**
Expand Down
Expand Up @@ -148,7 +148,7 @@
* associated with the state machine from the underlying log.
*
* @see RaftCommit
* @see StateMachineContext
* @see ServiceContext
* @see RaftOperationExecutor
*/
public interface RaftStateMachine extends Snapshottable, RaftSessionListener {
Expand All @@ -159,7 +159,7 @@ public interface RaftStateMachine extends Snapshottable, RaftSessionListener {
* @param context The state machine context.
* @throws NullPointerException if {@code context} is null
*/
void init(StateMachineContext context);
void init(ServiceContext context);

/**
* Applies a commit to the state machine.
Expand Down
Expand Up @@ -25,10 +25,10 @@
* State machine context.
* <p>
* The context is reflective of the current position and state of the Raft state machine. In particular,
* it exposes the current approximate {@link StateMachineContext#wallClock() time} and all open
* it exposes the current approximate {@link ServiceContext#wallClock() time} and all open
* {@link RaftSessions}.
*/
public interface StateMachineContext {
public interface ServiceContext {

/**
* Returns the state machine identifier.
Expand All @@ -42,14 +42,14 @@ public interface StateMachineContext {
*
* @return The state machine name.
*/
String name();
ServiceName serviceName();

/**
* Returns the state machine type.
*
* @return The state machine type.
*/
String typeName();
ServiceType serviceType();

/**
* Returns the current state machine index.
Expand Down
@@ -0,0 +1,35 @@
/*
* Copyright 2017-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atomix.protocols.raft;

import io.atomix.protocols.raft.impl.DefaultServiceName;
import io.atomix.utils.Identifier;

/**
* Raft service name.
*/
public interface ServiceName extends Identifier<String> {

/**
* Creates a new Raft service identifier.
*
* @param name the service name
* @return the service identifier
*/
static ServiceName from(String name) {
return new DefaultServiceName(name);
}
}
@@ -0,0 +1,35 @@
/*
* Copyright 2017-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atomix.protocols.raft;

import io.atomix.protocols.raft.impl.DefaultServiceType;
import io.atomix.utils.Identifier;

/**
* Raft service type.
*/
public interface ServiceType extends Identifier<String> {

/**
* Creates a new Raft service type identifier.
*
* @param name the service type
* @return the service type identifier
*/
static ServiceType from(String name) {
return new DefaultServiceType(name);
}
}
Expand Up @@ -15,7 +15,6 @@
*/
package io.atomix.protocols.raft.impl;

import org.slf4j.LoggerFactory;
import io.atomix.protocols.raft.RaftClient;
import io.atomix.protocols.raft.RaftMetadataClient;
import io.atomix.protocols.raft.cluster.MemberId;
Expand All @@ -24,6 +23,7 @@
import io.atomix.protocols.raft.proxy.impl.NodeSelectorManager;
import io.atomix.protocols.raft.proxy.impl.RaftProxyManager;
import io.atomix.utils.concurrent.ThreadPoolContext;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -120,7 +120,7 @@ public String toString() {
private class SessionBuilder extends RaftProxy.Builder {
@Override
public RaftProxy build() {
return sessionManager.openSession(name, type, readConsistency, communicationStrategy, executor, timeout).join();
return sessionManager.openSession(serviceName, serviceType, readConsistency, communicationStrategy, executor, timeout).join();
}
}

Expand Down
Expand Up @@ -88,7 +88,7 @@ public CompletableFuture<Set<RaftSessionMetadata>> getSessions() {

@Override
public CompletableFuture<Set<RaftSessionMetadata>> getSessions(String type) {
return getMetadata().thenApply(response -> response.sessions().stream().filter(s -> s.typeName().equals(type)).collect(Collectors.toSet()));
return getMetadata().thenApply(response -> response.sessions().stream().filter(s -> s.serviceType().equals(type)).collect(Collectors.toSet()));
}

}
Expand Up @@ -19,7 +19,6 @@
import io.atomix.protocols.raft.OperationType;
import io.atomix.protocols.raft.RaftCommit;
import io.atomix.protocols.raft.RaftOperationExecutor;
import io.atomix.protocols.raft.StateMachineContext;
import io.atomix.protocols.raft.error.ApplicationException;
import io.atomix.utils.concurrent.Scheduled;
import org.slf4j.Logger;
Expand All @@ -43,23 +42,13 @@
*/
public class DefaultRaftOperationExecutor implements RaftOperationExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRaftOperationExecutor.class);
private final StateMachineContext context;
private final Queue<Runnable> tasks = new LinkedList<>();
private final List<ScheduledTask> scheduledTasks = new ArrayList<>();
private final List<ScheduledTask> complete = new ArrayList<>();
private final Map<OperationId, Function<RaftCommit<byte[]>, byte[]>> operations = new HashMap<>();
private OperationType operationType;
private long timestamp;

public DefaultRaftOperationExecutor(StateMachineContext context) {
this.context = checkNotNull(context, "context cannot be null");
}

@Override
public StateMachineContext getContext() {
return context;
}

/**
* Sets the current operation type.
*
Expand Down
@@ -0,0 +1,31 @@
/*
* Copyright 2017-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atomix.protocols.raft.impl;

import io.atomix.protocols.raft.ServiceName;
import io.atomix.utils.AbstractIdentifier;

/**
* Default Raft service identifier.
*/
public class DefaultServiceName extends AbstractIdentifier<String> implements ServiceName {
private DefaultServiceName() {
}

public DefaultServiceName(String value) {
super(value);
}
}
@@ -0,0 +1,31 @@
/*
* Copyright 2017-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atomix.protocols.raft.impl;

import io.atomix.protocols.raft.ServiceType;
import io.atomix.utils.AbstractIdentifier;

/**
* Default Raft service type identifier.
*/
public class DefaultServiceType extends AbstractIdentifier<String> implements ServiceType {
private DefaultServiceType() {
}

public DefaultServiceType(String value) {
super(value);
}
}

0 comments on commit 5fa58b6

Please sign in to comment.