Skip to content

Commit

Permalink
Update high-level API to manage multiple resource clusters separately…
Browse files Browse the repository at this point in the history
… from the core implementation.
  • Loading branch information
kuujo committed Feb 14, 2015
1 parent ee5ee82 commit a28499f
Show file tree
Hide file tree
Showing 8 changed files with 434 additions and 78 deletions.
40 changes: 20 additions & 20 deletions api/src/main/java/net/kuujo/copycat/Copycat.java
Expand Up @@ -99,42 +99,42 @@ static Copycat create(CopycatConfig config) {
<T> EventLog<T> createEventLog(String name, EventLogConfig config);

/**
* Creates a new state log.
* Creates a new status log.
*
* @param name The name of the state log to create.
* @param <T> The state log entry type.
* @return The state log instance.
* @param name The name of the status log to create.
* @param <T> The status log entry type.
* @return The status log instance.
*/
<T> StateLog<T> createStateLog(String name);

/**
* Creates a new state log.
* Creates a new status log.
*
* @param name The name of the state log to create.
* @param config The state log configuration.
* @param <T> The state log entry type.
* @return The state log instance.
* @param name The name of the status log to create.
* @param config The status log configuration.
* @param <T> The status log entry type.
* @return The status log instance.
*/
<T> StateLog<T> createStateLog(String name, StateLogConfig config);

/**
* Creates a new replicated state machine.
* Creates a new replicated status machine.
*
* @param name The name of the state machine to create.
* @param stateType The state machine state type.
* @param initialState The state machine's initial state.
* @param <T> The state machine state type.
* @return The state machine instance.
* @param name The name of the status machine to create.
* @param stateType The status machine status type.
* @param initialState The status machine's initial status.
* @param <T> The status machine status type.
* @return The status machine instance.
*/
<T> StateMachine<T> createStateMachine(String name, Class<T> stateType, Class<? extends T> initialState);

/**
* Creates a new replicated state machine.
* Creates a new replicated status machine.
*
* @param name The name of the state machine to create.
* @param config The state machine configuration.
* @param <T> The state machine state type.
* @return The state machine instance.
* @param name The name of the status machine to create.
* @param config The status machine configuration.
* @param <T> The status machine status type.
* @return The status machine instance.
*/
<T> StateMachine<T> createStateMachine(String name, StateMachineConfig config);

Expand Down
13 changes: 0 additions & 13 deletions api/src/main/java/net/kuujo/copycat/CopycatConfig.java
Expand Up @@ -17,7 +17,6 @@

import com.typesafe.config.ConfigValueFactory;
import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.cluster.internal.coordinator.CoordinatorConfig;
import net.kuujo.copycat.util.AbstractConfigurable;
import net.kuujo.copycat.util.Configurable;
import net.kuujo.copycat.util.ConfigurationException;
Expand Down Expand Up @@ -240,18 +239,6 @@ public CopycatConfig withDefaultExecutor(Executor executor) {
return this;
}

/**
* Resolves the Copycat configuration to a coordinator configuration.
*
* @return A coordinator configuration for this Copycat configuration.
*/
public CoordinatorConfig resolve() {
return new CoordinatorConfig()
.withName(getName())
.withExecutor(getDefaultExecutor())
.withClusterConfig(getClusterConfig());
}

@Override
public String toString() {
return String.format("%s[%s]", getClass().getSimpleName(), config.root().unwrapped());
Expand Down
@@ -0,0 +1,55 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.internal;

import net.kuujo.copycat.protocol.AbstractProtocol;
import net.kuujo.copycat.protocol.Protocol;
import net.kuujo.copycat.protocol.ProtocolClient;
import net.kuujo.copycat.protocol.ProtocolServer;
import net.kuujo.copycat.util.internal.Assert;

import java.net.URI;
import java.util.concurrent.Executor;

/**
* Coordinated protocol implementation.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
public class CoordinatedProtocol extends AbstractProtocol {
private final int id;
private final Protocol protocol;
private final ProtocolServerRegistry registry;
private final Executor executor;

public CoordinatedProtocol(int id, Protocol protocol, ProtocolServerRegistry registry, Executor executor) {
this.id = id;
this.protocol = Assert.isNotNull(protocol, "protocol");
this.registry = Assert.isNotNull(registry, "registry");
this.executor = Assert.isNotNull(executor, "executor");
}

@Override
public ProtocolClient createClient(URI uri) {
return new CoordinatedProtocolClient(id, protocol.createClient(uri));
}

@Override
public ProtocolServer createServer(URI uri) {
return new CoordinatedProtocolServer(id, registry.get(uri), executor);
}

}
@@ -0,0 +1,68 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.internal;

import net.kuujo.copycat.protocol.ProtocolClient;
import net.kuujo.copycat.protocol.ProtocolConnection;
import net.kuujo.copycat.protocol.ProtocolException;
import net.kuujo.copycat.util.internal.Bytes;

import java.util.concurrent.CompletableFuture;

/**
* Protocol client coordinator.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
public class CoordinatedProtocolClient implements ProtocolClient {
private final int id;
private final ProtocolClient client;

public CoordinatedProtocolClient(int id, ProtocolClient client) {
this.id = id;
this.client = client;
}

@Override
public CompletableFuture<ProtocolConnection> connect() {
CompletableFuture<ProtocolConnection> future = new CompletableFuture<>();
client.connect().whenComplete((connection, connectionError) -> {
if (connectionError == null) {
connection.write(Bytes.of(id)).whenComplete((response, responseError) -> {
if (responseError == null) {
byte result = response.get();
if (result == 1) {
future.complete(connection);
} else {
future.completeExceptionally(new ProtocolException("Server not found"));
}
} else {
future.completeExceptionally(responseError);
}
});
} else {
future.completeExceptionally(connectionError);
}
});
return future;
}

@Override
public CompletableFuture<Void> close() {
return client.close();
}

}
@@ -0,0 +1,57 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.internal;

import net.kuujo.copycat.EventListener;
import net.kuujo.copycat.protocol.ProtocolConnection;
import net.kuujo.copycat.protocol.ProtocolServer;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

/**
* Coordinates protocol server.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
public class CoordinatedProtocolServer implements ProtocolServer {
private final int id;
private final ProtocolServerCoordinator coordinator;
private final Executor executor;

public CoordinatedProtocolServer(int id, ProtocolServerCoordinator coordinator, Executor executor) {
this.id = id;
this.coordinator = coordinator;
this.executor = executor;
}

@Override
public CompletableFuture<Void> listen() {
return coordinator.listen();
}

@Override
public ProtocolServer connectListener(EventListener<ProtocolConnection> handler) {
coordinator.connectListener(id, handler, executor);
return this;
}

@Override
public CompletableFuture<Void> close() {
return coordinator.close();
}

}

0 comments on commit a28499f

Please sign in to comment.