diff --git a/containers/jersey2-routing/pom.xml b/containers/jersey2-routing/pom.xml
index aa8415a..db6173f 100644
--- a/containers/jersey2-routing/pom.xml
+++ b/containers/jersey2-routing/pom.xml
@@ -45,86 +45,70 @@
org.apache.httpcomponents
httpclient
- 4.5.1
org.slf4j
jcl-over-slf4j
- 1.7.13
org.apache.httpcomponents
httpclient
- 4.5.1
tests
test
commons-io
commons-io
- 2.4
com.jcraft
jsch
- 0.1.53
com.jcraft
jsch.agentproxy.usocket-jna
- 0.0.9
com.jcraft
jsch.agentproxy.sshagent
- 0.0.9
com.jcraft
jsch.agentproxy.connector-factory
- 0.0.6
com.jcraft
jsch.agentproxy.jsch
- 0.0.7
com.google.guava
guava
- 18.0
org.glassfish.jersey.containers
jersey-container-servlet
- 2.22.1
javax
javaee-api
- 7.0
org.apache.curator
curator-framework
- 2.8.0
org.apache.curator
curator-test
- 2.8.0
test
com.fasterxml.jackson.core
jackson-databind
- 2.6.0
org.apache.curator
curator-recipes
- 2.8.0
diff --git a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/AdminClient.java b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/AdminClient.java
index 8b6eddc..0b5c8ad 100644
--- a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/AdminClient.java
+++ b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/AdminClient.java
@@ -14,10 +14,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.util.List;
import java.util.Map;
-import static com.yahoo.gondola.container.ShardManagerProtocol.ShardManagerException.CODE.*;
+import static com.yahoo.gondola.container.ShardManagerProtocol.ShardManagerException.CODE.SLAVE_NOT_SYNC;
/**
* The type Admin client.
@@ -26,10 +27,11 @@ public class AdminClient {
// TODO: move to config
public static final int RETRY_COUNT = 3;
- public static final int TIMEOUT_MS = 300;
+ public static final int TIMEOUT_MS = 3000;
private String serviceName;
private Config config;
private ShardManagerClient shardManagerClient;
+ private ConfigWriter configWriter;
private static Logger logger = LoggerFactory.getLogger(AdminClient.class);
private boolean tracing = false;
@@ -41,10 +43,12 @@ public class AdminClient {
* @param shardManagerClient the shard manager client
* @param config the config
*/
- public AdminClient(String serviceName, ShardManagerClient shardManagerClient, Config config) {
+ public AdminClient(String serviceName, ShardManagerClient shardManagerClient, Config config,
+ ConfigWriter configWriter) {
this.serviceName = serviceName;
this.shardManagerClient = shardManagerClient;
this.config = config;
+ this.configWriter = configWriter;
this.config.registerForUpdates(config1 -> {
tracing = config1.getBoolean("tracing.adminCli");
});
@@ -87,11 +91,10 @@ public Config getConfig() throws AdminException {
/**
* Sets config.
*
- * @param config the config
+ * @param configFile the config file.
* @throws AdminException the admin exception
*/
- public void setConfig(Config config) throws AdminException {
- this.config = config;
+ public void setConfig(File configFile) throws AdminException {
}
@@ -116,33 +119,35 @@ public void splitShard(String fromShardId, String toShardId) throws AdminExcepti
*/
public void assignBuckets(Range range, String fromShardId, String toShardId)
throws InterruptedException, AdminException {
- trace("Executing assign buckets={} from {} to {}", range, fromShardId, toShardId);
+ trace("[admin] Executing assign buckets={} from {} to {}", range, fromShardId, toShardId);
for (int i = 1; i <= RETRY_COUNT; i++) {
try {
- trace("Initializing slaves on {} ...", toShardId);
+ trace("[admin] Initializing slaves on {} ...", toShardId);
shardManagerClient.startObserving(toShardId, fromShardId, TIMEOUT_MS);
trace(
- "All nodes in {} are in slave mode, waiting for slave logs approaching to leader's log position.",
+ "[admin] All nodes in {} are in slave mode, "
+ + "waiting for slave logs approaching to leader's log position.",
toShardId);
if (!shardManagerClient.waitSlavesApproaching(toShardId, -1)) {
throw new ShardManagerException(SLAVE_NOT_SYNC);
}
- trace("All nodes in {} logs approached to leader's log position, assigning buckets={} ...", toShardId,
- range);
+ trace("[admin] All nodes in {} logs approached to leader's log position, assigning buckets={} ...",
+ toShardId, range);
// migrateBuckets is a atomic operation executing on leader at fromShard,
// after operation is success, it will stop observing mode of toShard.
- shardManagerClient.migrateBuckets(range, fromShardId, toShardId, 2000);
-
- trace("Assign buckets complete, assigned buckets={} from {} to {}", range, fromShardId, toShardId);
+ shardManagerClient.migrateBuckets(range, fromShardId, toShardId, TIMEOUT_MS);
+ trace("[admin] success!");
+ trace("[admin] Writing latest config to config storage!");
+ saveConfig(fromShardId, toShardId);
break;
} catch (ShardManagerException e) {
try {
- shardManagerClient.stopObserving(fromShardId, toShardId, TIMEOUT_MS);
+ shardManagerClient.stopObserving(toShardId, fromShardId, TIMEOUT_MS);
} catch (ShardManagerException e1) {
- logger.info("Rollback, Stop observing failed, ignoring the error.");
+ logger.info("Rollback, Stop observing failed, ignoring the error. msg={}", e1.getMessage());
}
if (i != RETRY_COUNT) {
logger.warn("Error occurred during assign buckets.. retrying {} / {}, errorMsg={}",
@@ -155,22 +160,15 @@ public void assignBuckets(Range range, String fromShardId, String toSha
}
}
- /**
- * Close assign buckets.
- *
- * @param fromShardId the from shard id
- * @param toShardId the to shard id
- * @param range the range
- */
- public void closeAssignBuckets(Range range, String fromShardId, String toShardId)
- throws ShardManagerException, InterruptedException {
- trace("Executing close the state of assign buckets");
-
- trace("Waiting all nodes bucket table updated...");
- shardManagerClient.waitBucketsCondition(range, fromShardId, toShardId, 3000);
- trace("closing the state of assign buckets...");
- shardManagerClient.setBuckets(range, fromShardId, toShardId, true);
- trace("Done!");
+ private void saveConfig(String fromShardId, String toShardId) throws AdminException {
+ configWriter.setBucketMap(fromShardId, getBucketMapString(fromShardId));
+ configWriter.setBucketMap(fromShardId, getBucketMapString(toShardId));
+ setConfig(configWriter.save());
+ }
+
+ private String getBucketMapString(String fromShardId) {
+ // TODO: implement
+ return "";
}
private void trace(String format, Object... args) {
@@ -288,6 +286,7 @@ class ShardStat extends Stat {
}
class AdminException extends Exception {
+
ErrorCode errorCode;
@@ -303,6 +302,7 @@ public AdminException() {
enum ErrorCode {
CONFIG_NOT_FOUND(10000);
private int code;
+
ErrorCode(int code) {
this.code = code;
}
diff --git a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/ChangeLogProcessor.java b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/ChangeLogProcessor.java
new file mode 100644
index 0000000..c6a9b6c
--- /dev/null
+++ b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/ChangeLogProcessor.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2015, Yahoo Inc.
+ * Copyrights licensed under the New BSD License.
+ * See the accompanying LICENSE file for terms.
+ */
+
+package com.yahoo.gondola.container;
+
+import com.yahoo.gondola.Command;
+import com.yahoo.gondola.Gondola;
+import com.yahoo.gondola.Shard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The type Change log processor.
+ */
+public class ChangeLogProcessor {
+
+ private Gondola gondola;
+ private Map threads = new HashMap<>();
+ private static Logger logger = LoggerFactory.getLogger(ChangeLogProcessor.class);
+ private ChangeLogConsumer changeLogConsumer;
+
+
+ /**
+ * Instantiates a new Change log processor.
+ * @param gondola the gondola
+ * @param changeLogConsumer change log consumer provided by app.
+ */
+ public ChangeLogProcessor(Gondola gondola, ChangeLogConsumer changeLogConsumer) {
+ this.gondola = gondola;
+ this.changeLogConsumer = changeLogConsumer;
+ for (String shardId : gondola.getConfig().getShardIds(gondola.getHostId())) {
+ createThread(shardId);
+ }
+ }
+
+ /**
+ * The type Change log processor thread.
+ */
+ class ChangeLogProcessorThread extends Thread {
+
+ int appliedIndex = 0;
+ private int retryCount = 0;
+ private Shard shard;
+ private String shardId;
+ private String hostId;
+ private int memberId;
+
+ public ChangeLogProcessorThread(String shardId) {
+ setName("ChangeLogProcessor");
+ this.shardId = shardId;
+ this.hostId = gondola.getHostId();
+ // TODO: get memberId
+ }
+
+ public void run() {
+ Command command;
+ shard = gondola.getShard(shardId);
+ while (true) {
+ try {
+ command = shard.getCommittedCommand(appliedIndex + 1);
+ if (changeLogConsumer != null) {
+ changeLogConsumer.apply(shardId, command);
+ }
+ appliedIndex++;
+ } catch (InterruptedException e) {
+ logger.warn("[{}-{}] ChangeLogProcessor interrupted, exit..", hostId, memberId);
+ return;
+ } catch (Throwable e) {
+ logger.error(e.getMessage(), e);
+ if (++retryCount == 3) {
+ logger
+ .error("[{}-{}] Max retry count reached, exit..", hostId, memberId);
+ return;
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e1) {
+ logger.warn("[{}-{}] ChangeLogProcessor interrupted, exit..", hostId, memberId);
+ return;
+ }
+ }
+ }
+ }
+ }
+
+ private void createThread(String shardId) {
+ ChangeLogProcessorThread thread = new ChangeLogProcessorThread(shardId);
+ threads.put(shardId, thread);
+ }
+
+ /**
+ * Gets applied index.
+ *
+ * @param shardId the shard id
+ * @return the applied index
+ */
+ public int getAppliedIndex(String shardId) {
+ return threads.get(shardId).appliedIndex;
+ }
+
+ /**
+ * Stop.
+ */
+ public void stop() {
+ com.yahoo.gondola.core.Utils.stopThreads(new ArrayList<>(threads.values()));
+ }
+
+ public void start() {
+ threads.values().forEach(ChangeLogProcessorThread::start);
+ }
+
+ /**
+ * Change log consumer functional interface.
+ */
+ @FunctionalInterface
+ public interface ChangeLogConsumer {
+ void apply(String shardId, Command command);
+ }
+}
diff --git a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/ConfigLoader.java b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/ConfigLoader.java
new file mode 100644
index 0000000..4f255be
--- /dev/null
+++ b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/ConfigLoader.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2015, Yahoo Inc.
+ * Copyrights licensed under the New BSD License.
+ * See the accompanying LICENSE file for terms.
+ */
+
+package com.yahoo.gondola.container;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URL;
+
+/**
+ * The type Config loader.
+ */
+public class ConfigLoader {
+
+ /**
+ * Load config file.
+ *
+ * @param configUri the config uri
+ * @return the file
+ */
+ public File loadConfig(URI configUri) {
+ switch (configUri.getScheme()) {
+ case "classpath":
+ URL resource = getClass().getClassLoader().getResource(configUri.getPath());
+ if (resource == null) {
+ throw new IllegalArgumentException("File not found in " + configUri.toString());
+ }
+ return new File(resource.getFile());
+ case "file":
+ return new File(configUri.getPath());
+ case "zookeeper":
+ return null;
+ default:
+ throw new IllegalArgumentException("Scheme not supported - " + configUri.getScheme());
+ }
+ }
+}
diff --git a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/ConfigWriter.java b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/ConfigWriter.java
new file mode 100644
index 0000000..e285bac
--- /dev/null
+++ b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/ConfigWriter.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2015, Yahoo Inc.
+ * Copyrights licensed under the New BSD License.
+ * See the accompanying LICENSE file for terms.
+ */
+
+package com.yahoo.gondola.container;
+
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigRenderOptions;
+import com.typesafe.config.ConfigValueFactory;
+import com.yahoo.gondola.Config;
+
+import org.apache.curator.utils.CloseableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+/**
+ * Config writer supports change config value and save to file.
+ */
+public class ConfigWriter {
+
+ Config config;
+
+ Logger logger = LoggerFactory.getLogger(ConfigWriter.class);
+ com.typesafe.config.Config configImpl;
+ File tmpFile, configFile;
+
+ public ConfigWriter(File configFile) {
+ loadConfig(configFile);
+ }
+
+ private void loadConfig(File configFile) {
+ this.configFile = configFile;
+ config = new Config(configFile);
+ configImpl = ConfigFactory.parseFile(configFile);
+ verify();
+ }
+
+ private void verify() {
+ // TODO: implement
+ }
+
+ /**
+ * Save file.
+ *
+ * @return the file
+ */
+ public File save() {
+ ConfigRenderOptions renderOptions = ConfigRenderOptions.defaults().setJson(false);
+ bumpVersion();
+ String configData = configImpl.root().render(renderOptions);
+ FileWriter writer = null;
+ try {
+ writer = new FileWriter(tmpFile);
+ writer.write(configData);
+ } catch (IOException e) {
+ logger.warn("failed to write file, message={}", e.getMessage());
+ } finally {
+ CloseableUtils.closeQuietly(writer);
+ }
+ return tmpFile;
+ }
+
+ private void bumpVersion() {
+ // TODO: bump config version
+ }
+
+ public void setBucketMap(String shardId, String bucketMapString) {
+ configImpl = configImpl
+ .withValue("gondola.shards." + shardId, ConfigValueFactory.fromAnyRef(bucketMapString));
+ }
+}
diff --git a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/LockManager.java b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/LockManager.java
index 0fb8492..cf73ae2 100644
--- a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/LockManager.java
+++ b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/LockManager.java
@@ -7,7 +7,7 @@
package com.yahoo.gondola.container;
import com.google.common.collect.Range;
-import com.yahoo.gondola.Config;
+import com.yahoo.gondola.Gondola;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,14 +32,16 @@ class LockManager {
private Map, CountDownLatch> buckets = new HashMap<>();
private ReadWriteLock rwLock = new ReentrantReadWriteLock();
private boolean tracing = false;
+ private Gondola gondola;
/**
* Instantiates a new Lock manager.
*
- * @param config the config
+ * @param gondola The Gondola instance.
*/
- public LockManager(Config config) {
- config.registerForUpdates(config1 -> tracing = config1.getBoolean("tracing.router"));
+ public LockManager(Gondola gondola) {
+ gondola.getConfig().registerForUpdates(config1 -> tracing = config1.getBoolean("tracing.router"));
+ this.gondola = gondola;
}
/**
@@ -51,20 +53,20 @@ public LockManager(Config config) {
*/
public void filterRequest(int bucketId, String shardId) throws InterruptedException {
if (globalLock != null) {
- trace("Request blocked by global lock");
+ trace("[{}] Request blocked by global lock", gondola.getHostId());
globalLock.await();
}
CountDownLatch shardLock = shards.get(shardId);
if (shardLock != null) {
- trace("Request blocked by shard lock - shardId={}", shardId);
+ trace("[{}] Request blocked by shard lock - shardId={}", gondola.getHostId(), shardId);
shardLock.await();
}
List bucketLocks = getBucketLocks(bucketId);
if (bucketLocks.size() != 0) {
for (CountDownLatch bucketLock : bucketLocks) {
- trace("Request blocked by bucket lock - bucketId={}", bucketId);
+ trace("[{}] Request blocked by bucket lock - bucketId={}", gondola.getHostId(), bucketId);
bucketLock.await();
}
}
@@ -82,7 +84,7 @@ public long unblockRequestOnShard(String shardId) {
// TODO: this is not the expected blocking count.
long count = lock.getCount();
lock.countDown();
- trace("Request unblocked on shardId={}", shardId);
+ trace("[{}] Request unblocked on shardId={}", gondola.getHostId(), shardId);
return count;
}
return 0;
@@ -94,7 +96,7 @@ public long unblockRequestOnShard(String shardId) {
* @param shardId the shard id
*/
public void blockRequestOnShard(String shardId) {
- trace("Block requests on shard : {}", shardId);
+ trace("[{}] Block requests on shard : {}", gondola.getHostId(), shardId);
shards.putIfAbsent(shardId, new CountDownLatch(1));
}
@@ -102,7 +104,7 @@ public void blockRequestOnShard(String shardId) {
* Unblock all requests.
*/
public void unblockRequest() {
- trace("Unblock all requests");
+ trace("[{}] Unblock all requests", gondola.getHostId());
if (globalLock != null) {
globalLock.countDown();
globalLock = null;
@@ -113,7 +115,7 @@ public void unblockRequest() {
* Block all requests.
*/
public void blockRequest() {
- trace("Block all requests");
+ trace("[{}] Block all requests", gondola.getHostId());
globalLock = new CountDownLatch(1);
}
@@ -123,7 +125,7 @@ public void blockRequest() {
* @param splitRange the split range
*/
public void unblockRequestOnBuckets(Range splitRange) {
- trace("Unblock requests on buckets : {}", splitRange);
+ trace("[{}] Unblock requests on buckets : {}", gondola.getHostId(), splitRange);
CountDownLatch lock = buckets.remove(splitRange);
if (lock != null) {
lock.countDown();
@@ -137,7 +139,7 @@ public void unblockRequestOnBuckets(Range splitRange) {
* @param splitRange the split range
*/
public void blockRequestOnBuckets(Range splitRange) {
- trace("Block requests on buckets : {}", splitRange);
+ trace("[{}] Block requests on buckets : {}", gondola.getHostId(), splitRange);
buckets.putIfAbsent(splitRange, new CountDownLatch(1));
}
diff --git a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/RoutingFilter.java b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/RoutingFilter.java
index f03085e..24aa9f9 100644
--- a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/RoutingFilter.java
+++ b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/RoutingFilter.java
@@ -10,12 +10,12 @@
import com.google.common.collect.Range;
import com.yahoo.gondola.Config;
import com.yahoo.gondola.Gondola;
+import com.yahoo.gondola.GondolaException;
import com.yahoo.gondola.Member;
import com.yahoo.gondola.Shard;
import com.yahoo.gondola.container.client.ProxyClient;
import com.yahoo.gondola.container.spi.RoutingHelper;
-import org.glassfish.jersey.server.ResourceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -23,6 +23,7 @@
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
+import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -89,7 +90,8 @@ public class RoutingFilter implements ContainerRequestFilter, ContainerResponseF
private List shutdownCallbacks = new ArrayList<>();
private static List instances = new ArrayList<>();
- private ResourceConfig application;
+ private ChangeLogProcessor changeLogProcessor;
+ private RoutingService service;
/**
* Disallow default constructor.
@@ -97,37 +99,32 @@ public class RoutingFilter implements ContainerRequestFilter, ContainerResponseF
private RoutingFilter() {
}
- /**
- * Get application instance.
- *
- * @return Application instance.
- */
- public ResourceConfig getApplication() {
- return application;
- }
-
/**
* Instantiates a new Routing filter.
*
* @param gondola the gondola
* @param routingHelper the routing helper
* @param proxyClientProvider the proxy client provider
- * @param application the application instance
* @throws ServletException the servlet exception
*/
RoutingFilter(Gondola gondola, RoutingHelper routingHelper, ProxyClientProvider proxyClientProvider,
- ResourceConfig application)
+ ChangeLogProcessor changeLogProcessor, RoutingService service)
throws ServletException {
this.gondola = gondola;
this.routingHelper = routingHelper;
- this.application = application;
- lockManager = new LockManager(gondola.getConfig());
+ lockManager = new LockManager(gondola);
bucketManager = new BucketManager(gondola.getConfig());
loadRoutingTable();
loadConfig();
watchGondolaEvent();
proxyClient = proxyClientProvider.getProxyClient(gondola.getConfig());
instances.add(this);
+ this.changeLogProcessor = changeLogProcessor;
+ this.service = service;
+ }
+
+ public RoutingService getService() {
+ return service;
}
private void loadConfig() {
@@ -143,17 +140,23 @@ private void watchGondolaEvent() {
if (roleChangeEvent.leader.isLocal()) {
CompletableFuture.runAsync(() -> {
String shardId = roleChangeEvent.shard.getShardId();
- trace("Become leader on shard {}, blocking all requests to the shard....", shardId);
+ trace("[{}-{}] Become leader on \"{}\", blocking all requests to the shard....",
+ gondola.getHostId(), roleChangeEvent.leader.getMemberId(), shardId);
lockManager.blockRequestOnShard(shardId);
- trace("Wait until raft logs applied to storage...");
+ trace("[{}-{}] Wait until raft logs applied to storage...",
+ gondola.getHostId(), roleChangeEvent.leader.getMemberId());
waitDrainRaftLogs(shardId);
- trace("Raft logs are up-to-date, notify application is ready to serve...");
- routingHelper.beforeServing(shardId);
- trace("Ready for serving, unblocking the requests...");
+ trace("[{}-{}] Raft logs are up-to-date, notify application is ready to serve...",
+ gondola.getHostId(), roleChangeEvent.leader.getMemberId());
+ service.ready(shardId);
+ trace("[{}-{}] Ready for serving, unblocking the requests...",
+ gondola.getHostId(), roleChangeEvent.leader.getMemberId());
long count = lockManager.unblockRequestOnShard(shardId);
- trace("System is back to serving, unblocked {} requests ...", count);
+ trace("[{}-{}] System is back to serving, unblocked {} requests ...",
+ gondola.getHostId(), roleChangeEvent.leader.getMemberId(), count);
}, singleThreadExecutor).exceptionally(throwable -> {
- logger.info("Errors while executing leader change event", throwable);
+ logger.info("[{}-{}] Errors while executing leader change event. message={}",
+ gondola.getHostId(), roleChangeEvent.leader.getMemberId(), throwable.getMessage());
return null;
});
}
@@ -175,6 +178,9 @@ public void filter(ContainerRequestContext request) throws IOException {
incrementBucketCounter(bucketId);
String shardId = getShardId(request);
+ request.setProperty("shardId", shardId);
+ request.setProperty("bucketId", bucketId);
+
trace("Processing request: {} of shard={}, forwarded={}",
request.getUriInfo().getAbsolutePath(), shardId,
request.getHeaders().containsKey(X_FORWARDED_BY) ? request.getHeaders()
@@ -287,6 +293,8 @@ protected boolean isLeaderInShard(String shardId) {
*/
protected void updateBucketRange(Range range, String fromShard, String toShard,
boolean migrationComplete) {
+ trace("[{}] Update bucket range={} from {} to {}",
+ gondola.getHostId(), range, fromShard, toShard);
bucketManager.updateBucketRange(range, fromShard, toShard, migrationComplete);
}
@@ -433,7 +441,8 @@ private void proxyRequestToLeader(ContainerRequestContext request, String shardI
return;
} catch (IOException e) {
fail = true;
- logger.error("Error while forwarding request to shard:{} {}", shardId, appUri, e);
+ logger.error("[{}] Error while forwarding request to shard:{} {}", gondola.getHostId(), shardId, appUri,
+ e);
}
}
abortResponse(request, BAD_GATEWAY, "All servers are not available in Shard: " + shardId);
@@ -442,7 +451,8 @@ private void proxyRequestToLeader(ContainerRequestContext request, String shardI
private void updateRoutingTableIfNeeded(String shardId, Response proxiedResponse) {
String appUri = proxiedResponse.getHeaderString(X_GONDOLA_LEADER_ADDRESS);
if (appUri != null) {
- logger.info("New leader found, correct routing table with : shardId={}, appUrl={}", shardId, appUri);
+ logger.info("[{}] New leader found, correct routing table with : shardId={}, appUrl={}",
+ gondola.getHostId(), shardId, appUri);
updateShardRoutingEntries(shardId, appUri);
}
}
@@ -459,7 +469,7 @@ private void updateShardRoutingEntries(String shardId, String appUri) {
newAppUris.add(appUrl);
}
}
- logger.info("Update shard '{}' leader as {}", shardId, appUri);
+ trace("[{}] Update shard '{}' leader as {}", gondola.getHostId(), shardId, appUri);
routingTable.put(shardId, newAppUris);
}
@@ -521,14 +531,15 @@ private void waitDrainRaftLogs(String shardId) {
try {
Thread.sleep(500);
long now = System.currentTimeMillis();
- int diff = gondola.getShard(shardId).getCommitIndex() - routingHelper.getAppliedIndex(shardId);
+ int diff = gondola.getShard(shardId).getCommitIndex() - changeLogProcessor.getAppliedIndex(shardId);
if (now - checkTime > 10000) {
checkTime = now;
- logger.warn("Recovery running for {} seconds, {} logs left", (now - startTime) / 1000, diff);
+ logger.warn("[{}] Recovery running for {} seconds, {} logs left", gondola.getHostId(),
+ (now - startTime) / 1000, diff);
}
synced = diff <= 0;
} catch (Exception e) {
- logger.warn("Unknown error", e);
+ logger.warn("[{}] Unknown error. message={}", gondola.getHostId(), e);
}
}
}
@@ -556,7 +567,12 @@ public void registerShutdownFunction(Runnable runnable) {
shutdownCallbacks.add(runnable);
}
+ public void start() {
+ changeLogProcessor.start();
+ }
+
private void stop() {
+ changeLogProcessor.stop();
shutdownCallbacks.forEach(Runnable::run);
gondola.stop();
}
@@ -570,7 +586,9 @@ public static class Builder {
RoutingHelper routingHelper;
ProxyClientProvider proxyClientProvider;
ShardManagerProvider shardManagerProvider;
- ResourceConfig application;
+ Class extends RoutingService> serviceClass;
+ RoutingService service;
+ URI configUri;
public static Builder createRoutingFilter() {
return new Builder();
@@ -580,16 +598,6 @@ public static Builder createRoutingFilter() {
proxyClientProvider = new ProxyClientProvider();
}
- public Builder setGondola(Gondola gondola) {
- this.gondola = gondola;
- return this;
- }
-
- public Builder setRoutingHelper(RoutingHelper routingHelper) {
- this.routingHelper = routingHelper;
- return this;
- }
-
public Builder setProxyClientProvider(ProxyClientProvider proxyClientProvider) {
this.proxyClientProvider = proxyClientProvider;
return this;
@@ -600,30 +608,53 @@ public Builder setShardManagerProvider(ShardManagerProvider shardManagerProvider
return this;
}
- public Builder setApplication(ResourceConfig application) {
- this.application = application;
+ public Builder setService(Class extends RoutingService> serviceClass) {
+ this.serviceClass = serviceClass;
return this;
}
- public RoutingFilter build() throws ServletException {
- Preconditions.checkState(gondola != null, "Gondola instance must be set");
+ public Builder setConfigUri(URI configUri) {
+ this.configUri = configUri;
+ return this;
+ }
+
+ public RoutingFilter build()
+ throws ServletException, GondolaException, NoSuchMethodException, IllegalAccessException,
+ InvocationTargetException, InstantiationException {
+ Preconditions.checkState(serviceClass != null, "Service class must be set");
+ Preconditions.checkState(configUri != null, "Config URI must be set");
+ gondola = createGondolaInstance();
+ service = serviceClass.getConstructor(Gondola.class).newInstance(gondola);
+ routingHelper = service.provideRoutingHelper();
Preconditions.checkState(routingHelper != null, "RoutingHelper instance must be set");
- Preconditions.checkState(application != null, "ResourceConfig instance must be set");
+
if (shardManagerProvider == null) {
shardManagerProvider = new ShardManagerProvider(gondola.getConfig());
}
- RoutingFilter routingFilter = new RoutingFilter(gondola, routingHelper, proxyClientProvider, application);
- initializeShardManagerServer(routingFilter);
+ ChangeLogProcessor changeLogProcessor = new ChangeLogProcessor(gondola, service.provideChangeLogConsumer());
+ RoutingFilter routingFilter =
+ new RoutingFilter(gondola, routingHelper, proxyClientProvider, changeLogProcessor,
+ service);
+ initShardManagerServer(routingFilter);
+ gondola.start();
+ routingFilter.start();
return routingFilter;
}
- private void initializeShardManagerServer(RoutingFilter routingFilter) {
+ private Gondola createGondolaInstance() {
+ String hostId = System.getenv("hostId") != null ? System.getenv("hostId") : "host1";
+ return new Gondola(new Config(new ConfigLoader().loadConfig(configUri)), hostId);
+ }
+
+ private void initShardManagerServer(RoutingFilter routingFilter) {
ShardManagerServer shardManagerServer = shardManagerProvider.getShardManagerServer();
- ShardManager shardManager =
- new ShardManager(gondola, routingFilter, gondola.getConfig(),
- shardManagerProvider.getShardManagerClient());
- shardManagerServer.setShardManager(shardManager);
- routingFilter.registerShutdownFunction(shardManagerServer::stop);
+ if (shardManagerServer != null) {
+ ShardManager shardManager =
+ new ShardManager(gondola, routingFilter, gondola.getConfig(),
+ shardManagerProvider.getShardManagerClient());
+ shardManagerServer.setShardManager(shardManager);
+ routingFilter.registerShutdownFunction(shardManagerServer::stop);
+ }
}
}
diff --git a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/RoutingService.java b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/RoutingService.java
new file mode 100644
index 0000000..94d724a
--- /dev/null
+++ b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/RoutingService.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2015, Yahoo Inc.
+ * Copyrights licensed under the New BSD License.
+ * See the accompanying LICENSE file for terms.
+ */
+
+package com.yahoo.gondola.container;
+
+import com.yahoo.gondola.Command;
+import com.yahoo.gondola.Gondola;
+import com.yahoo.gondola.NotLeaderException;
+import com.yahoo.gondola.container.spi.RoutingHelper;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+import javax.ws.rs.container.ContainerRequestContext;
+
+/**
+ * The type Routing service.
+ */
+public abstract class RoutingService {
+
+ protected Gondola gondola;
+ private List> eventCallbacks = new ArrayList<>();
+
+ /**
+ * Instantiates a new Routing service.
+ *
+ * @param gondola the gondola
+ */
+ public RoutingService(Gondola gondola) {
+ this.gondola = gondola;
+ }
+
+ public void registerEventHandler(Consumer consumer) {
+ eventCallbacks.add(consumer);
+ }
+
+ /**
+ * Provide changeLog consumer
+ *
+ */
+ public abstract ChangeLogProcessor.ChangeLogConsumer provideChangeLogConsumer();
+
+ /**
+ * Provide routing helper routing helper.
+ *
+ * @return the routing helper
+ */
+ public abstract RoutingHelper provideRoutingHelper();
+
+ /**
+ * Called by container when the shard is ready for serving.
+ *
+ * @param shardId the shard id
+ */
+ public abstract void ready(String shardId);
+
+ /**
+ * Write log.
+ *
+ * @param shardId the shard id
+ * @param bytes the bytes
+ * @throws NotLeaderException the not leader exception
+ * @throws InterruptedException the interrupted exception
+ */
+ public void writeLog(String shardId, byte[] bytes)
+ throws NotLeaderException, InterruptedException {
+ Command command = gondola.getShard(shardId).checkoutCommand();
+ command.commit(bytes, 0, bytes.length);
+ }
+
+ /**
+ * Gets shard id.
+ *
+ * @param request the request
+ * @return the shard id
+ */
+ public String getShardId(ContainerRequestContext request) {
+ return (String) request.getProperty("shardId");
+ }
+
+ /**
+ * Gets bucket id.
+ *
+ * @param request the request
+ * @return the bucket id
+ */
+ public int getBucketId(ContainerRequestContext request) {
+ return Integer.parseInt((String) request.getProperty("bucketId"));
+ }
+
+ /**
+ * Is leader boolean.
+ *
+ * @param shardId the shard id
+ * @return the boolean
+ */
+ public boolean isLeader(String shardId) {
+ return gondola.getShard(shardId).getLocalMember().isLeader();
+ }
+
+ /**
+ * The container event.
+ */
+ public static class Event {
+
+ public Type type;
+
+ /**
+ * Event type.
+ */
+ public enum Type {
+ ROLE_CHANGE
+ }
+ }
+}
diff --git a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/ShardManager.java b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/ShardManager.java
index fae6f91..0d6a499 100644
--- a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/ShardManager.java
+++ b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/ShardManager.java
@@ -9,6 +9,7 @@
import com.google.common.collect.Range;
import com.yahoo.gondola.Config;
import com.yahoo.gondola.Gondola;
+import com.yahoo.gondola.GondolaException;
import com.yahoo.gondola.Member;
import com.yahoo.gondola.Shard;
import com.yahoo.gondola.container.client.ShardManagerClient;
@@ -17,12 +18,13 @@
import org.slf4j.LoggerFactory;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import static com.yahoo.gondola.container.ShardManagerProtocol.ShardManagerException.CODE.FAILED_START_SLAVE;
import static com.yahoo.gondola.container.ShardManagerProtocol.ShardManagerException.CODE.FAILED_STOP_SLAVE;
-import static com.yahoo.gondola.container.ShardManagerProtocol.ShardManagerException.CODE.NOT_LEADER;
+import static com.yahoo.gondola.container.ShardManagerProtocol.ShardManagerException.CODE.MASTER_IS_GONE;
/**
* The Shard manager.
@@ -61,13 +63,20 @@ public ShardManager(Gondola gondola, RoutingFilter filter, Config config, ShardM
public void startObserving(String shardId, String observedShardId, long timeoutMs)
throws ShardManagerException, InterruptedException {
boolean success = false;
- trace("[{}] shardId={} follows shardId={} as slave", gondola.getHostId(), shardId, observedShardId);
- for (Config.ConfigMember m : config.getMembersInShard(observedShardId)) {
- if (success = setSlave(shardId, m.getMemberId(), timeoutMs)) {
+ trace("[{}-{}] Try to follow shardId={} as slave...",
+ gondola.getHostId(), gondola.getShard(shardId).getLocalMember().getMemberId(), shardId, observedShardId);
+ List membersInShard = config.getMembersInShard(observedShardId);
+ for (Config.ConfigMember m : membersInShard) {
+ if (success = setSlave(shardId, m.getMemberId(), timeoutMs / membersInShard.size())) {
+ trace("[{}-{}] Successfully to follow masterId={}",
+ gondola.getHostId(), gondola.getShard(shardId).getLocalMember().getMemberId(), m.getMemberId());
break;
}
}
if (!success) {
+ logger.error("[{}-{}] Failed follow master={}",
+ gondola.getHostId(), gondola.getShard(shardId).getLocalMember().getMemberId(),
+ observedShardId);
throw new ShardManagerException(FAILED_START_SLAVE);
}
observedShards.add(observedShardId);
@@ -80,12 +89,10 @@ private boolean setSlave(String shardId, int memberId, long timeoutMs)
gondola.getShard(shardId).getLocalMember().setSlave(memberId);
return Utils.pollingWithTimeout(() -> {
Member.SlaveStatus status = gondola.getShard(shardId).getLocalMember().getSlaveStatus();
- if (status != null && status.running) {
+ if (slaveOperational(status)) {
+ trace("[{}] Successfully connect to leader node={}", gondola.getHostId(), memberId);
return true;
}
- logger.warn("Failed start observing {} on shard={}, msg={}",
- memberId, shardId,
- status != null && status.exception != null ? status.exception.getMessage() : "n/a");
return false;
}, timeoutMs / POLLING_TIMES, timeoutMs);
} catch (Exception e) {
@@ -93,58 +100,53 @@ private boolean setSlave(String shardId, int memberId, long timeoutMs)
}
}
+ private boolean slaveOperational(Member.SlaveStatus status) {
+ return status != null && status.running;
+ }
+
/**
* Stops observer mode to remote shard, and back to normal mode.
*/
@Override
- public void stopObserving(String shardId, String observedShardId, long timeoutMs) throws ShardManagerException,
- InterruptedException {
- trace("[{}] shardId={} un-followed shardId={}", gondola.getHostId(), shardId, observedShardId);
- boolean success = false;
- for (Config.ConfigMember m : config.getMembersInShard(observedShardId)) {
- if (success = unsetSlave(shardId, m.getMemberId(), timeoutMs)) {
- break;
- }
- }
- if (!success) {
- throw new ShardManagerException(FAILED_STOP_SLAVE);
+ public void stopObserving(String shardId, String masterShardId, long timeoutMs) throws ShardManagerException,
+ InterruptedException {
+ trace("[{}] shardId={} un-followed shardId={}", gondola.getHostId(), shardId, masterShardId);
+ Member.SlaveStatus status = gondola.getShard(shardId).getLocalMember().getSlaveStatus();
+ if (status == null) {
+ return;
}
- observedShards.remove(observedShardId);
- }
- private boolean unsetSlave(String shardId, int memberId, long timeoutMs)
- throws ShardManagerException, InterruptedException {
-
- try {
- Member.SlaveStatus status = gondola.getShard(shardId).getLocalMember().getSlaveStatus();
-
- // Not in slave mode, nothing to do.
- if (status == null) {
- return true;
- }
+ String curMasterShardId = config.getMember(status.masterId).getShardId();
- // Reject if following different leader
- if (status.memberId != memberId) {
- throw new ShardManagerException(FAILED_STOP_SLAVE,
- String.format(
- "Cannot stop slave due to different master. current=%d, target=%d",
- status.memberId, memberId));
- }
+ if (!curMasterShardId.equals(masterShardId)) {
+ throw
+ new ShardManagerException(FAILED_STOP_SLAVE,
+ String.format(
+ "Cannot stop slave due to follow different shard. current=%s, target=%s",
+ curMasterShardId, masterShardId));
+ }
+ try {
gondola.getShard(shardId).getLocalMember().setSlave(-1);
+ } catch (GondolaException e) {
+ throw new ShardManagerException(e);
+ }
- return
- Utils.pollingWithTimeout(() -> {
- if (gondola.getShard(shardId).getLocalMember().getSlaveStatus() == null) {
- return true;
- }
- logger.warn("Failed stop observing {} on shard={}", memberId, shardId);
- return false;
- }, timeoutMs / POLLING_TIMES, timeoutMs);
- } catch (Exception e) {
+ try {
+ if (!Utils.pollingWithTimeout(() -> {
+ if (gondola.getShard(shardId).getLocalMember().getSlaveStatus() == null) {
+ return true;
+ }
+ logger.warn("Failed stop observing {} on shard={}", masterShardId, shardId);
+ return false;
+ }, timeoutMs / POLLING_TIMES, timeoutMs)) {
+ throw new ShardManagerException(FAILED_STOP_SLAVE, "timed out");
+ }
+ } catch (ExecutionException e) {
throw new ShardManagerException(e);
}
+ observedShards.remove(masterShardId);
}
/**
@@ -155,7 +157,7 @@ public void migrateBuckets(Range splitRange, String fromShardId,
String toShardId, long timeoutMs) throws ShardManagerException {
// Make sure only leader can execute this request.
if (!filter.isLeaderInShard(fromShardId)) {
- throw new ShardManagerException(NOT_LEADER);
+ return;
} else {
assignBucketOnLeader(splitRange, fromShardId, toShardId, timeoutMs);
}
@@ -168,9 +170,7 @@ private void assignBucketOnLeader(Range splitRange, String fromShardId,
filter.waitNoRequestsOnBuckets(splitRange, timeoutMs);
shardManagerClient.waitSlavesSynced(toShardId, timeoutMs);
shardManagerClient.stopObserving(toShardId, fromShardId, timeoutMs);
- filter.updateBucketRange(splitRange, fromShardId, toShardId, true);
- trace("Update global bucket table for buckets= from {} to {}", splitRange, fromShardId, toShardId);
- shardManagerClient.setBuckets(splitRange, fromShardId, toShardId, false);
+ filter.updateBucketRange(splitRange, fromShardId, toShardId, false);
} catch (InterruptedException | ExecutionException e) {
logger.warn("Error occurred, rollback!", e);
try {
@@ -197,9 +197,14 @@ private boolean waitLogApproach(String shardId, long timeoutMs, int logPosDiff)
if (shard.getCommitIndex() != 0 && shard.getCommitIndex() - getSavedIndex(shard) <= logPosDiff) {
return true;
}
+ Member.SlaveStatus slaveStatus = shard.getLocalMember().getSlaveStatus();
+ if (!slaveOperational(slaveStatus)) {
+ throw new ShardManagerException(MASTER_IS_GONE);
+ }
trace("[{}] {} Log status={}, currentDiff={}, targetDiff={}",
- gondola.getHostId(), shardId, shard.getCommitIndex() != 0 ? "RUNNING" : "DOWN",
- shard.getCommitIndex() - getSavedIndex(shard), logPosDiff);
+ gondola.getHostId(), shardId, slaveOperational(slaveStatus) ? "RUNNING" : "DOWN",
+ slaveOperational(slaveStatus) ? "N/A" : shard.getCommitIndex() - getSavedIndex(shard),
+ logPosDiff);
return false;
}, timeoutMs / POLLING_TIMES, timeoutMs);
} catch (ExecutionException e) {
@@ -228,17 +233,6 @@ public void setBuckets(Range splitRange, String fromShardId, String toS
filter.updateBucketRange(splitRange, fromShardId, toShardId, migrationComplete);
}
- @Override
- public boolean waitBucketsCondition(Range range, String fromShardId, String toShardId, long timeoutMs)
- throws InterruptedException {
- try {
- return Utils.pollingWithTimeout(() -> filter.isBucketRange(range, fromShardId, toShardId), timeoutMs / 3,
- timeoutMs);
- } catch (ExecutionException e) {
- return false;
- }
- }
-
private void trace(String format, Object... args) {
if (tracing) {
logger.info(format, args);
diff --git a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/ShardManagerProtocol.java b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/ShardManagerProtocol.java
index 72ecdd0..be3d9f4 100644
--- a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/ShardManagerProtocol.java
+++ b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/ShardManagerProtocol.java
@@ -42,7 +42,7 @@ void stopObserving(String shardId, String observedShardId, long timeoutMs)
* @throws ShardManagerException the shard manager exception
*/
void migrateBuckets(Range splitRange, String fromShardId, String toShardId,
- long timeoutMs) throws ShardManagerException;
+ long timeoutMs) throws ShardManagerException, InterruptedException;
/**
* Wait slave until slave synced.
@@ -74,20 +74,8 @@ boolean waitSlavesApproaching(String shardId, long timeoutMs)
* @param toShardId the to shard id
* @param migrationComplete flag to indicate the migration is complete
*/
- void setBuckets(Range splitRange, String fromShardId, String toShardId, boolean migrationComplete);
-
- /**
- * Wait for certain bucket condition.
- *
- * @param range
- * @param fromShardId
- * @param toShardId
- * @param timeoutMs
- * @return success if current bucket range is equal to expected range.
- * @throws InterruptedException
- */
- boolean waitBucketsCondition(Range range, String fromShardId, String toShardId, long timeoutMs)
- throws InterruptedException;
+ void setBuckets(Range splitRange, String fromShardId, String toShardId, boolean migrationComplete)
+ throws ShardManagerException, InterruptedException;
/**
* The type Shard manager exception.
@@ -99,10 +87,12 @@ public ShardManagerException(Exception e) {
}
public enum CODE {
- NOT_LEADER,
FAILED_START_SLAVE,
FAILED_STOP_SLAVE,
- SLAVE_NOT_SYNC
+ FAILED_MIGRATE_BUCKETS,
+ FAILED_SET_BUCKETS,
+ MASTER_IS_GONE,
+ SLAVE_NOT_SYNC,
}
diff --git a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/Utils.java b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/Utils.java
index 9f588c2..ad72034 100644
--- a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/Utils.java
+++ b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/Utils.java
@@ -7,6 +7,9 @@
package com.yahoo.gondola.container;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
/**
* Utility class.
@@ -14,46 +17,77 @@
public class Utils {
/**
- * Polling with timeout utility function, accept a boolean supplier that throws Exception.
- * It'll retry until the retryCount reached or there's timeout.
+ * Polling with timeout utility function, accept a boolean supplier that throws Exception. It'll retry until the
+ * retryCount reached or there's timeout.
*
- * @param supplier
- * @param waitTimeMs
* @param timeoutMs -1 means no limitation, 0 means no wait.
* @return true if success, false if timeout reached.
- * @throws InterruptedException
- * @throws ExecutionException
*/
public static boolean pollingWithTimeout(CheckedBooleanSupplier supplier, long waitTimeMs, long timeoutMs)
- throws InterruptedException, ExecutionException {
+ throws InterruptedException, ExecutionException {
+ return pollingWithTimeout(supplier, waitTimeMs, timeoutMs, null, null);
+ }
+
+
+ public static boolean pollingWithTimeout(CheckedBooleanSupplier supplier, long waitTimeMs, long timeoutMs,
+ Lock lock, Condition condition)
+ throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
// TODO: default timeout MS should be provided by upper layer.
- if (timeoutMs == -1) {
+ if (timeoutMs < 0) {
waitTimeMs = 1000;
}
try {
- while (timeoutMs == -1 || System.currentTimeMillis() - start < timeoutMs) {
- boolean success = supplier.getAsBoolean();
- if (success) {
- return true;
- }
- long remain = timeoutMs - (System.currentTimeMillis() - start);
- if (timeoutMs == 0) {
- break;
+ while (timeoutMs <= 0 || System.currentTimeMillis() - start < timeoutMs) {
+ lock(lock);
+ try {
+ if (supplier.getAsBoolean()) {
+ return true;
+ }
+ long remain = timeoutMs - (System.currentTimeMillis() - start);
+ if (timeoutMs == 0) {
+ break;
+ }
+ long timeout = timeoutMs == -1 || waitTimeMs < remain || remain <= 0 ? waitTimeMs : remain;
+ wait(lock, condition, timeout);
+ } finally {
+ unlock(lock);
}
- Thread.sleep(timeoutMs == -1 || waitTimeMs < remain || remain <= 0 ? waitTimeMs : remain);
}
return false;
+ } catch (InterruptedException e) {
+ throw e;
} catch (Exception e) {
throw new ExecutionException(e);
}
}
+ private static void wait(Lock lock, Condition condition, long timeout) throws InterruptedException {
+ if (lock != null) {
+ condition.await(timeout, TimeUnit.MILLISECONDS);
+ } else {
+ Thread.sleep(timeout);
+ }
+ }
+
+ private static void unlock(Lock lock) {
+ if (lock != null) {
+ lock.unlock();
+ }
+ }
+
+ private static void lock(Lock lock) {
+ if (lock != null) {
+ lock.lock();
+ }
+ }
+
/**
* A functional interface that return boolean value and throw Exception if any error.
*/
@FunctionalInterface
public interface CheckedBooleanSupplier {
+
boolean getAsBoolean() throws Exception;
}
}
diff --git a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/client/ShardManagerClient.java b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/client/ShardManagerClient.java
index d32d585..719c5e9 100644
--- a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/client/ShardManagerClient.java
+++ b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/client/ShardManagerClient.java
@@ -12,4 +12,5 @@
* ShardManager provides the capability to manage the shard.
*/
public interface ShardManagerClient extends ShardManagerProtocol {
+ void stop();
}
diff --git a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/client/ZookeeperAction.java b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/client/ZookeeperAction.java
index 5d7662e..fd0241d 100644
--- a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/client/ZookeeperAction.java
+++ b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/client/ZookeeperAction.java
@@ -82,4 +82,13 @@ public static class Args {
public int rangeStop;
public boolean complete;
}
+
+ @Override
+ public String toString() {
+ return "ZookeeperAction{"
+ + "action=" + action
+ + ", memberId=" + memberId
+ + ", args=" + args
+ + '}';
+ }
}
diff --git a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/client/ZookeeperShardManagerClient.java b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/client/ZookeeperShardManagerClient.java
index 9cfdf06..a6ae534 100644
--- a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/client/ZookeeperShardManagerClient.java
+++ b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/client/ZookeeperShardManagerClient.java
@@ -9,29 +9,33 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Range;
import com.yahoo.gondola.Config;
-import com.yahoo.gondola.container.Utils;
-import com.yahoo.gondola.container.client.ZookeeperStat.Status;
+import com.yahoo.gondola.container.client.ZookeeperAction.Action;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.utils.CloseableUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
import java.util.concurrent.ExecutionException;
-
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+
+import static com.yahoo.gondola.container.ShardManagerProtocol.ShardManagerException.CODE.FAILED_SET_BUCKETS;
+import static com.yahoo.gondola.container.ShardManagerProtocol.ShardManagerException.CODE.FAILED_START_SLAVE;
+import static com.yahoo.gondola.container.Utils.pollingWithTimeout;
import static com.yahoo.gondola.container.client.ZookeeperAction.Action.MIGRATE_1;
import static com.yahoo.gondola.container.client.ZookeeperAction.Action.MIGRATE_2;
import static com.yahoo.gondola.container.client.ZookeeperAction.Action.MIGRATE_3;
import static com.yahoo.gondola.container.client.ZookeeperAction.Action.START_SLAVE;
import static com.yahoo.gondola.container.client.ZookeeperAction.Action.STOP_SLAVE;
import static com.yahoo.gondola.container.client.ZookeeperStat.Status.APPROACHED;
-import static com.yahoo.gondola.container.client.ZookeeperStat.Status.RUNNING;
import static com.yahoo.gondola.container.client.ZookeeperStat.Status.SYNCED;
import static com.yahoo.gondola.container.client.ZookeeperUtils.ensurePath;
@@ -41,20 +45,25 @@
public class ZookeeperShardManagerClient implements ShardManagerClient {
private String serviceName;
- CuratorFramework client;
- Config config;
+ private CuratorFramework client;
+ private Config config;
+
+ private ObjectMapper objectMapper = new ObjectMapper();
+ private Logger logger = LoggerFactory.getLogger(ZookeeperShardManagerClient.class);
+ private PathChildrenCache stats;
+ private boolean tracing = false;
+ private String clientName;
- ObjectMapper objectMapper = new ObjectMapper();
- Logger logger = LoggerFactory.getLogger(ZookeeperShardManagerClient.class);
- PathChildrenCache stats;
- boolean tracing = false;
+ Lock lock = new ReentrantLock();
+ Condition newEvent = lock.newCondition();
- public ZookeeperShardManagerClient(String serviceName, String connectString, Config config) {
+ public ZookeeperShardManagerClient(String serviceName, String clientName, String connectString, Config config) {
client = CuratorFrameworkFactory.newClient(connectString, new RetryOneTime(1000));
client.start();
this.serviceName = serviceName;
this.config = config;
- ensurePath(serviceName, client.getZookeeperClient());
+ this.clientName = clientName;
+ ensurePath(serviceName, client);
watchZookeeperStats();
config.registerForUpdates(config1 -> tracing = config.getBoolean("tracing.router"));
}
@@ -62,31 +71,79 @@ public ZookeeperShardManagerClient(String serviceName, String connectString, Con
private void watchZookeeperStats() {
try {
stats = new PathChildrenCache(client, ZookeeperUtils.statBasePath(serviceName), true);
+ stats.getListenable().addListener((curatorFramework, pathChildrenCacheEvent) -> notifyNewEvent());
stats.start();
} catch (Exception e) {
throw new IllegalStateException("Cannot start stats watch service.", e);
}
}
+ private void notifyNewEvent() {
+ lock.lock();
+ try {
+ newEvent.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
@Override
public void startObserving(String shardId, String observedShardId, long timeoutMs)
throws ShardManagerException, InterruptedException {
- for (Config.ConfigMember m : config.getMembersInShard(shardId)) {
- setAction(m.getMemberId(), START_SLAVE, shardId, observedShardId, timeoutMs);
+ // Set slave state
+ sendActionToShard(shardId, START_SLAVE, shardId, observedShardId, timeoutMs);
+ if (!waitCondition(shardId, ZookeeperStat::isSlaveOperational, timeoutMs)) {
+ throw new ShardManagerException(FAILED_START_SLAVE, "timed out");
}
}
- private void setAction(int memberId, ZookeeperAction.Action action, Object... args) {
+ private boolean waitCondition(String shardId, Function statChecker, long timeoutMs)
+ throws InterruptedException, ShardManagerException {
try {
- trace("Write action {} to memberId={}", action, memberId);
+ if (!pollingWithTimeout(() -> getStatCheckPredicate(shardId, statChecker),
+ timeoutMs / 3, timeoutMs, lock, newEvent)) {
+ return false;
+ }
+ } catch (ExecutionException e) {
+ throw new ShardManagerException(e);
+ }
+ return true;
+ }
+
+
+ private boolean getStatCheckPredicate(String shardId, Function statChecker)
+ throws java.io.IOException {
+ for (ChildData d : stats.getCurrentData()) {
+ ZookeeperStat stat = objectMapper.readValue(d.getData(), ZookeeperStat.class);
+ if (shardId != null && !isMemberInShard(stat.memberId, shardId)) {
+ continue;
+ }
+ if (!statChecker.apply(stat)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean isMemberInShard(int memberId, String shardId) {
+ return config.getMember(memberId).getShardId().equals(shardId);
+ }
+
+ private void sendActionToShard(String shardId, Action action, Object... args) {
+ config.getMembersInShard(shardId).forEach(m -> sendAction(m.getMemberId(), action, args));
+ }
+
+ private void sendAction(int memberId, Action action, Object... args) {
+ try {
+ trace("[{}] Write action {} to memberId={}, params={}", clientName, action, memberId, args);
ZookeeperAction zookeeperAction = new ZookeeperAction();
zookeeperAction.memberId = memberId;
zookeeperAction.action = action;
zookeeperAction.args = Arrays.asList(args);
client.setData().forPath(ZookeeperUtils.actionPath(serviceName, memberId),
- objectMapper.writeValueAsBytes(zookeeperAction));
+ objectMapper.writeValueAsBytes(zookeeperAction));
} catch (Exception e) {
- logger.warn("Cannot write action {} to memberId={}", action, memberId);
+ logger.warn("[{}] Cannot write action {} to memberId={}, params={}", clientName, action, memberId, args);
}
}
@@ -94,84 +151,58 @@ private void setAction(int memberId, ZookeeperAction.Action action, Object... ar
@Override
public void stopObserving(String shardId, String observedShardId, long timeoutMs)
throws ShardManagerException, InterruptedException {
- for (Config.ConfigMember m : config.getMembersInShard(shardId)) {
- setAction(m.getMemberId(), STOP_SLAVE, shardId, observedShardId, timeoutMs);
- }
+ sendActionToShard(shardId, STOP_SLAVE, shardId, observedShardId, timeoutMs);
+ waitCondition(shardId, ZookeeperStat::isNormalOperational, timeoutMs);
}
+
@Override
public void migrateBuckets(Range splitRange, String fromShardId, String toShardId, long timeoutMs)
- throws ShardManagerException {
- for (Config.ConfigMember m : config.getMembersInShard(fromShardId)) {
- setAction(m.getMemberId(), MIGRATE_1, splitRange.lowerEndpoint(),
- splitRange.upperEndpoint(), fromShardId, toShardId, timeoutMs);
- }
+ throws ShardManagerException, InterruptedException {
+ sendActionToShard(fromShardId, MIGRATE_1, splitRange.lowerEndpoint(),
+ splitRange.upperEndpoint(), fromShardId, toShardId, timeoutMs);
+ waitCondition(fromShardId, ZookeeperStat::isMigrating1Operational, timeoutMs);
+ setBuckets(splitRange, fromShardId, toShardId, false);
+ setBuckets(splitRange, fromShardId, toShardId, true);
}
@Override
public boolean waitSlavesSynced(String shardId, long timeoutMs)
throws ShardManagerException, InterruptedException {
- try {
- return waitSlaveCondition(shardId, timeoutMs, Collections.singletonList(SYNCED));
- } catch (ExecutionException e) {
- logger.warn("Error happened in wait slaves -- msg={}", e.getMessage());
- return false;
- }
+ return waitCondition(shardId, stat -> stat.isSlaveOperational() && stat.status == SYNCED, timeoutMs);
}
@Override
public boolean waitSlavesApproaching(String shardId, long timeoutMs)
throws ShardManagerException, InterruptedException {
- try {
- return waitSlaveCondition(shardId, timeoutMs, Arrays.asList(APPROACHED, SYNCED));
- } catch (ExecutionException e) {
- logger.warn("Error happened in wait slaves -- msg={}", e.getMessage());
- return false;
- }
+ return waitCondition(shardId, getSlaveApproachingChecker(), timeoutMs);
}
- @Override
- public void setBuckets(Range splitRange, String fromShardId, String toShardId, boolean migrationComplete) {
- for (Config.ConfigMember m : config.getMembers()) {
- setAction(m.getMemberId(), !migrationComplete ? MIGRATE_2 : MIGRATE_3, splitRange.lowerEndpoint(),
- splitRange.upperEndpoint(), fromShardId, toShardId, migrationComplete);
- }
+ private Function getSlaveApproachingChecker() {
+ return stat -> {
+ if (!stat.isSlaveOperational()) {
+ throw new RuntimeException("Master is gone..");
+ }
+ return stat.isSlaveOperational() && Arrays.asList(APPROACHED, SYNCED).contains(stat.status);
+ };
}
@Override
- public boolean waitBucketsCondition(Range range, String fromShardId, String toShardId, long timeoutMs)
- throws InterruptedException {
- try {
- return Utils.pollingWithTimeout(() -> {
- for (ChildData childData : stats.getCurrentData()) {
- ZookeeperStat stat = objectMapper.readValue(childData.getData(), ZookeeperStat.class);
- if (stat.mode != ZookeeperStat.Mode.MIGRATING_2 || !stat.status.equals(RUNNING)) {
- return false;
- }
- }
- return true;
- }, timeoutMs / 3, timeoutMs);
- } catch (ExecutionException e) {
- logger.warn("Error while waitBucketCondition, message={}", e.getMessage());
- return false;
+ public void setBuckets(Range splitRange, String fromShardId, String toShardId, boolean migrationComplete)
+ throws ShardManagerException, InterruptedException {
+ sendActionToAll(!migrationComplete ? MIGRATE_2 : MIGRATE_3, splitRange.lowerEndpoint(),
+ splitRange.upperEndpoint(), fromShardId, toShardId, migrationComplete);
+ if (!waitCondition(null, !migrationComplete ? ZookeeperStat::isMigrating2Operational
+ : ZookeeperStat::isNormalOperational, 300)) {
+ throw new ShardManagerException(FAILED_SET_BUCKETS, "timed out");
}
}
- private boolean waitSlaveCondition(String shardId, long timeoutMs, List statuses)
- throws InterruptedException, ExecutionException {
- return Utils.pollingWithTimeout(() -> {
- for (ChildData childData : stats.getCurrentData()) {
- ZookeeperStat stat = objectMapper.readValue(childData.getData(), ZookeeperStat.class);
- if (!config.getMember(stat.memberId).getShardId().equals(shardId)) {
- continue;
- }
- if (stat.mode != ZookeeperStat.Mode.SLAVE || !statuses.contains(stat.status)) {
- return false;
- }
- }
- return true;
- }, timeoutMs / 3, timeoutMs);
+ private void sendActionToAll(Action action, Object... args) {
+ for (Config.ConfigMember m : config.getMembers()) {
+ sendAction(m.getMemberId(), action, args);
+ }
}
private void trace(String format, Object... args) {
@@ -179,4 +210,10 @@ private void trace(String format, Object... args) {
logger.info(format, args);
}
}
+
+ @Override
+ public void stop() {
+ CloseableUtils.closeQuietly(stats);
+ CloseableUtils.closeQuietly(client);
+ }
}
diff --git a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/client/ZookeeperStat.java b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/client/ZookeeperStat.java
index 201dce1..3e5fba9 100644
--- a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/client/ZookeeperStat.java
+++ b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/client/ZookeeperStat.java
@@ -6,6 +6,13 @@
package com.yahoo.gondola.container.client;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import static com.yahoo.gondola.container.client.ZookeeperStat.Mode.NORMAL;
+import static com.yahoo.gondola.container.client.ZookeeperStat.Mode.SLAVE;
+import static com.yahoo.gondola.container.client.ZookeeperStat.Status.FAILED;
+import static com.yahoo.gondola.container.client.ZookeeperStat.Status.RUNNING;
+
/**
* The type Zookeeper stat.
*/
@@ -27,7 +34,7 @@ public enum Status {
public int memberId;
public String shardId;
- public Mode mode = Mode.NORMAL;
+ public Mode mode = NORMAL;
public Status status = Status.STOP;
public String reason = null;
@@ -36,4 +43,24 @@ public String toString() {
return "ZookeeperStat{" + "memberId=" + memberId + ", shardId='" + shardId + '\'' + ", mode=" + mode
+ ", status=" + status + ", reason='" + reason + '\'' + '}';
}
+
+ @JsonIgnore
+ public boolean isSlaveOperational() {
+ return mode == SLAVE && status != FAILED;
+ }
+
+ @JsonIgnore
+ public boolean isNormalOperational() {
+ return mode == NORMAL && status == RUNNING;
+ }
+
+ @JsonIgnore
+ public boolean isMigrating1Operational() {
+ return mode == Mode.MIGRATING_1 && status == RUNNING;
+ }
+
+ @JsonIgnore
+ public boolean isMigrating2Operational() {
+ return mode == Mode.MIGRATING_2 && status == RUNNING;
+ }
}
diff --git a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/client/ZookeeperUtils.java b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/client/ZookeeperUtils.java
index 8f09d10..43d65fc 100644
--- a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/client/ZookeeperUtils.java
+++ b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/client/ZookeeperUtils.java
@@ -6,8 +6,7 @@
package com.yahoo.gondola.container.client;
-import org.apache.curator.CuratorZookeeperClient;
-import org.apache.curator.utils.EnsurePath;
+import org.apache.curator.framework.CuratorFramework;
/**
* The type Zookeeper utils.
@@ -24,6 +23,10 @@ public static String actionBasePath(String serviceName) {
return basePath(serviceName) + "/actions";
}
+ public static String configPath(String serviceName) {
+ return basePath(serviceName) + "/config";
+ }
+
public static String statPath(String serviceName, int memberId) {
return statBasePath(serviceName) + "/" + memberId;
}
@@ -32,11 +35,11 @@ public static String statBasePath(String serviceName) {
return basePath(serviceName) + "/stats";
}
- public static void ensurePath(String serviceName, CuratorZookeeperClient client) {
+ public static void ensurePath(String serviceName, CuratorFramework client) {
try {
- new EnsurePath(basePath(serviceName)).ensure(client);
- new EnsurePath(actionBasePath(serviceName)).ensure(client);
- new EnsurePath(statBasePath(serviceName)).ensure(client);
+ client.checkExists().creatingParentContainersIfNeeded().forPath(basePath(serviceName));
+ client.checkExists().creatingParentContainersIfNeeded().forPath(actionBasePath(serviceName));
+ client.checkExists().creatingParentContainersIfNeeded().forPath(statBasePath(serviceName));
} catch (Exception e) {
throw new IllegalStateException(e);
}
diff --git a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/impl/ZookeeperConfigProvider.java b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/impl/ZookeeperConfigProvider.java
new file mode 100644
index 0000000..4a3532d
--- /dev/null
+++ b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/impl/ZookeeperConfigProvider.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2015, Yahoo Inc.
+ * Copyrights licensed under the New BSD License.
+ * See the accompanying LICENSE file for terms.
+ */
+
+package com.yahoo.gondola.container.impl;
+
+import com.yahoo.gondola.Config;
+import com.yahoo.gondola.container.client.ZookeeperUtils;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.curator.utils.CloseableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+
+
+/**
+ * The type Zookeeper config provider.
+ */
+public class ZookeeperConfigProvider implements Config.ConfigProvider {
+
+ private CuratorFramework client;
+ private String serviceName;
+ private NodeCache cache;
+ private Path tmpFile;
+ private Path confFile;
+
+ Logger logger = LoggerFactory.getLogger(ZookeeperConfigProvider.class);
+
+ /**
+ * Instantiates a new Zookeeper config provider.
+ *
+ * @param client the client
+ * @param serviceName the service name
+ * @throws Exception the exception
+ */
+ public ZookeeperConfigProvider(CuratorFramework client, String serviceName) throws Exception {
+ this.client = client;
+ this.serviceName = serviceName;
+ tmpFile = confFile(true);
+ confFile = confFile(false);
+ String configPath = ZookeeperUtils.configPath(serviceName);
+ saveFile(client.getData().forPath(configPath));
+ cache = new NodeCache(client, configPath);
+ cache.getListenable().addListener(() -> {
+ try {
+ if (cache.getCurrentData() != null) {
+ byte[] data = cache.getCurrentData().getData();
+ if (data.length > 0) {
+ saveFile(data);
+ }
+ }
+ } catch (Exception e) {
+ logger.warn("Error while processing config file change event. message={}", e.getMessage());
+ }
+ });
+ cache.start();
+ }
+
+ private void saveFile(byte[] bytes) throws IOException {
+ FileWriter writer = new FileWriter(tmpFile.toFile());
+ writer.write(new String(bytes));
+ writer.flush();
+ writer.close();
+ verifyConfig(tmpFile.toFile());
+ Files.move(tmpFile, confFile, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
+ }
+
+ private Path confFile(boolean tmp) {
+ String tmpDir = System.getProperty("java.io.tmpdir");
+ return Paths.get(
+ tmpDir + File.separator + "gondola-" + serviceName + "." + (tmp ? Thread.currentThread().getId() : "conf"));
+ }
+
+ @Override
+ public void saveConfigFile(File configFile) {
+ verifyConfig(configFile);
+ try {
+ // TODO: transaction here.
+ client.setData().forPath(ZookeeperUtils.configPath(serviceName), IOUtils.toByteArray(configFile.toURI()));
+ } catch (Exception e) {
+ throw new IllegalStateException("Cannot write config file to remote server.");
+ }
+ }
+
+ private void verifyConfig(File configFile) {
+ new Config(configFile);
+ }
+
+ /**
+ * Gets config file.
+ *
+ * @return the config file
+ */
+ @Override
+ public File getConfigFile() {
+ return confFile.toFile();
+ }
+
+ /**
+ * Stop config fetching daemon.
+ */
+ @Override
+ public void stop() {
+ CloseableUtils.closeQuietly(cache);
+ }
+}
diff --git a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/impl/ZookeeperShardManagerServer.java b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/impl/ZookeeperShardManagerServer.java
index ae7f224..fc4e612 100644
--- a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/impl/ZookeeperShardManagerServer.java
+++ b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/impl/ZookeeperShardManagerServer.java
@@ -8,24 +8,27 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Range;
+import com.yahoo.gondola.Config;
import com.yahoo.gondola.Gondola;
-import com.yahoo.gondola.Member;
-import com.yahoo.gondola.Shard;
+import com.yahoo.gondola.container.AdminClient;
import com.yahoo.gondola.container.ShardManager;
import com.yahoo.gondola.container.ShardManagerProtocol;
import com.yahoo.gondola.container.ShardManagerServer;
import com.yahoo.gondola.container.client.ZookeeperAction;
import com.yahoo.gondola.container.client.ZookeeperStat;
import com.yahoo.gondola.container.client.ZookeeperUtils;
+import com.yahoo.gondola.core.Utils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.utils.CloseableUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -49,10 +52,12 @@
*/
public class ZookeeperShardManagerServer implements ShardManagerServer {
+ private static final long RETRY_WAIT_TIME = AdminClient.TIMEOUT_MS / AdminClient.RETRY_COUNT;
private CuratorFramework client;
private ShardManager delegate;
private String serviceName;
private Gondola gondola;
+ private Config config;
ObjectMapper objectMapper = new ObjectMapper();
Logger logger = LoggerFactory.getLogger(ZookeeperShardManagerServer.class);
List nodes = new ArrayList<>();
@@ -64,21 +69,21 @@ public class ZookeeperShardManagerServer implements ShardManagerServer {
public ZookeeperShardManagerServer(String serviceName, String connectString, Gondola gondola) {
this.serviceName = serviceName;
this.gondola = gondola;
+ config = gondola.getConfig();
+ config.registerForUpdates(config -> tracing = config.getBoolean("tracing.router"));
client = CuratorFrameworkFactory.newClient(connectString, new RetryOneTime(1000));
client.start();
- initNode();
Watcher watcher = new Watcher();
watcher.start();
threads.add(watcher);
- gondola.getConfig().registerForUpdates(config -> tracing = config.getBoolean("tracing.router"));
+ initNode(gondola.getHostId());
}
- private void initNode() {
- ensurePath(serviceName, client.getZookeeperClient());
- for (Shard shard : gondola.getShardsOnHost()) {
- Member member = shard.getLocalMember();
+ private void initNode(String hostId) {
+ ensurePath(serviceName, client);
+ for (Config.ConfigMember member : config.getMembersInHost(hostId)) {
try {
- trace("Init for memberId={}", member.getMemberId());
+ trace("[{}-{}] Initializing zookeeper node...", gondola.getHostId(), member.getMemberId());
String actionPath = actionPath(serviceName, member.getMemberId());
String statPath = statPath(serviceName, member.getMemberId());
ZookeeperStat stat;
@@ -88,8 +93,9 @@ private void initNode() {
} catch (Exception e) {
stat = new ZookeeperStat();
stat.memberId = member.getMemberId();
- stat.shardId = shard.getShardId();
- client.create().forPath(statPath, objectMapper.writeValueAsBytes(stat));
+ stat.shardId = member.getShardId();
+ client.create().creatingParentContainersIfNeeded()
+ .forPath(statPath, objectMapper.writeValueAsBytes(stat));
}
try {
@@ -97,7 +103,8 @@ private void initNode() {
} catch (Exception e) {
action = new ZookeeperAction();
action.memberId = member.getMemberId();
- client.create().forPath(actionPath, objectMapper.writeValueAsBytes(action));
+ client.create().creatingParentContainersIfNeeded()
+ .forPath(actionPath, objectMapper.writeValueAsBytes(action));
}
currentStats.put(stat.memberId, stat);
actions.put(action.memberId, action);
@@ -107,8 +114,8 @@ private void initNode() {
node.start();
nodes.add(node);
} catch (Exception e) {
- logger.warn("Unable to create member node, memberId={}, msg={}",
- member.getMemberId(), e.getMessage());
+ logger.warn("[{}-{}] Unable to create member node, msg={}",
+ gondola.getHostId(), member.getMemberId(), e.getMessage());
}
}
}
@@ -129,7 +136,7 @@ public void run() {
} catch (InterruptedException e) {
return;
} catch (Exception e) {
- logger.error("Unexpected error - {}", e.getMessage());
+ logger.error("[{}] Unexpected error - {}", gondola.getHostId(), e.getMessage());
}
}
}
@@ -138,25 +145,25 @@ private void watchAction(ZookeeperAction action, ZookeeperStat stat)
throws InterruptedException {
ZookeeperAction.Args args = action.parseArgs();
ZookeeperStat.Status origStatus = stat.status;
- switch (action.action) {
- case NOOP:
- case STOP_SLAVE:
- case MIGRATE_1:
- case MIGRATE_2:
- case MIGRATE_3:
- return;
- case START_SLAVE:
- try {
- if (delegate.waitSlavesSynced(args.fromShard, 0)) {
- stat.status = SYNCED;
- } else if (delegate.waitSlavesApproaching(args.fromShard, 0)) {
- stat.status = APPROACHED;
- } else {
- stat.status = RUNNING;
+ switch (stat.mode) {
+ case NORMAL:
+ case MIGRATING_1:
+ case MIGRATING_2:
+ break;
+ case SLAVE:
+ if (action.action.equals(ZookeeperAction.Action.START_SLAVE)) {
+ try {
+ if (delegate.waitSlavesSynced(args.fromShard, 0)) {
+ stat.status = SYNCED;
+ } else if (delegate.waitSlavesApproaching(args.fromShard, 0)) {
+ stat.status = APPROACHED;
+ } else {
+ stat.status = RUNNING;
+ }
+ } catch (ShardManagerProtocol.ShardManagerException e) {
+ stat.status = FAILED;
+ stat.reason = e.getMessage();
}
- } catch (ShardManagerProtocol.ShardManagerException e) {
- stat.status = FAILED;
- stat.reason = e.getMessage();
}
break;
}
@@ -167,14 +174,15 @@ private void watchAction(ZookeeperAction action, ZookeeperStat stat)
}
private void writeStat(Integer memberId, ZookeeperStat zookeeperStat) {
- trace("Write stat on memberId={}, stat={}", memberId, zookeeperStat);
+ trace("[{}-{}] Update stat stat={}",
+ gondola.getHostId(), memberId, zookeeperStat);
try {
client.setData().forPath(ZookeeperUtils.statPath(serviceName, memberId),
- objectMapper.writeValueAsBytes(zookeeperStat));
+ objectMapper.writeValueAsBytes(zookeeperStat));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
- logger.warn("Write stat on memberId={} failed, reason={}", e.getMessage());
+ logger.warn("[{}-{}] Write stat failed, reason={}", gondola.getHostId(), memberId, e.getMessage());
}
}
@@ -185,12 +193,14 @@ private void resume(ZookeeperStat stat, ZookeeperAction action) throws Interrupt
private NodeCacheListener getListener(NodeCache node, ZookeeperStat stat) {
return () -> {
try {
- ZookeeperAction action =
- objectMapper.readValue(node.getCurrentData().getData(), ZookeeperAction.class);
- actions.put(action.memberId, action);
- processAction(stat, action);
+ if (node.getCurrentData() != null) {
+ ZookeeperAction action =
+ objectMapper.readValue(node.getCurrentData().getData(), ZookeeperAction.class);
+ actions.put(action.memberId, action);
+ processAction(stat, action);
+ }
} catch (Exception e) {
- logger.warn("Process action error - {}", e.getMessage());
+ logger.warn("[{}] Process action error - {}", gondola.getHostId(), e.getMessage());
}
};
}
@@ -199,45 +209,51 @@ private void processAction(ZookeeperStat stat, ZookeeperAction action) throws In
if (action.action == ZookeeperAction.Action.NOOP) {
return;
}
- trace("[{}] Processing action={} args={}", stat.memberId, action.action, action.args);
+ trace("[{}-{}] Processing action={} args={}", gondola.getHostId(), stat.memberId, action.action, action.args);
ZookeeperAction.Args args = action.parseArgs();
- try {
- switch (action.action) {
- case NOOP:
- return;
- case START_SLAVE:
- delegate.startObserving(args.fromShard, args.toShard, args.timeoutMs);
- stat.mode = SLAVE;
- break;
- case STOP_SLAVE:
- delegate.stopObserving(args.fromShard, args.toShard, args.timeoutMs);
- stat.mode = NORMAL;
- break;
- case MIGRATE_1:
- delegate.migrateBuckets(Range.closed(args.rangeStart, args.rangeStop),
- args.fromShard, args.toShard, args.timeoutMs);
- stat.mode = MIGRATING_1;
- break;
- case MIGRATE_2:
- delegate.setBuckets(Range.closed(args.rangeStart, args.rangeStop),
- args.fromShard, args.toShard, args.complete);
- stat.mode = MIGRATING_2;
- break;
- case MIGRATE_3:
- delegate.setBuckets(Range.closed(args.rangeStart, args.rangeStop),
- args.fromShard, args.toShard, args.complete);
- stat.mode = NORMAL;
- break;
+ while (true) {
+ try {
+ switch (action.action) {
+ case NOOP:
+ return;
+ case START_SLAVE:
+ delegate.startObserving(args.fromShard, args.toShard, args.timeoutMs);
+ stat.mode = SLAVE;
+ break;
+ case STOP_SLAVE:
+ delegate.stopObserving(args.fromShard, args.toShard, args.timeoutMs);
+ stat.mode = NORMAL;
+ break;
+ case MIGRATE_1:
+ delegate.migrateBuckets(Range.closed(args.rangeStart, args.rangeStop),
+ args.fromShard, args.toShard, args.timeoutMs);
+ stat.mode = MIGRATING_1;
+ break;
+ case MIGRATE_2:
+ delegate.setBuckets(Range.closed(args.rangeStart, args.rangeStop),
+ args.fromShard, args.toShard, args.complete);
+ stat.mode = MIGRATING_2;
+ break;
+ case MIGRATE_3:
+ delegate.setBuckets(Range.closed(args.rangeStart, args.rangeStop),
+ args.fromShard, args.toShard, args.complete);
+ stat.mode = NORMAL;
+ break;
+ }
+ stat.status = RUNNING;
+ stat.reason = null;
+ } catch (ShardManagerProtocol.ShardManagerException e) {
+ logger.warn("[{}-{}] Cannot execute action={} args={} reason={}",
+ gondola.getHostId(), action.memberId, action, action.args, e.getMessage());
+ stat.status = FAILED;
+ stat.reason = e.getMessage();
+ }
+ writeStat(stat.memberId, stat);
+ if (stat.status != FAILED) {
+ break;
}
- stat.status = RUNNING;
- stat.reason = null;
- } catch (ShardManagerProtocol.ShardManagerException e) {
- logger.warn("[{}] Cannot execute action={} args={} reason={}",
- action.memberId, action, action.args, e.getMessage());
- stat.status = FAILED;
- stat.reason = e.getMessage();
+ Thread.sleep(RETRY_WAIT_TIME);
}
- writeStat(stat.memberId, stat);
}
@Override
@@ -247,15 +263,17 @@ public void setShardManager(ShardManager shardManager) {
@Override
public void stop() {
- for (Thread t : threads) {
- t.interrupt();
+ nodes.forEach(CloseableUtils::closeQuietly);
+ for (NodeCache node : nodes) {
try {
- t.join();
- } catch (InterruptedException e) {
- logger.error("Interrupted waiting {} terminate.", t.getName(), e.getMessage());
+ node.close();
+ } catch (IOException e) {
+ // ignored.
+ logger.warn("[{}] Close ZK node cache failed. message={}", gondola.getHostId(), e.getMessage());
}
}
- client.close();
+ Utils.stopThreads(threads);
+ CloseableUtils.closeQuietly(client);
}
private void trace(String format, Object... args) {
diff --git a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/spi/RoutingHelper.java b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/spi/RoutingHelper.java
index c6584d3..e1318a4 100644
--- a/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/spi/RoutingHelper.java
+++ b/containers/jersey2-routing/src/main/java/com/yahoo/gondola/container/spi/RoutingHelper.java
@@ -21,21 +21,13 @@
public interface RoutingHelper {
/**
- * The callback method to get gondola cluster ID based on request.
+ * The callback method to get gondola shard ID based on request.
*
* @param request the request
* @return Gondola bucket Id, -1 means try to find colo affinity in routing layer
*/
int getBucketId(ContainerRequestContext request);
- /**
- * Returns the applied index.
- *
- * @param clusterId the cluster id
- * @return the applied index
- */
- int getAppliedIndex(String clusterId);
-
/**
* Returns the site ID that should handle the specified request.
*
@@ -43,11 +35,4 @@ public interface RoutingHelper {
* @return a non-null site ID
*/
String getSiteId(ContainerRequestContext request);
-
- /**
- * Callback to tell application that everyhing is set.
- *
- * @param clusterId the cluster id
- */
- void beforeServing(String clusterId);
}
diff --git a/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/AdminClientIT.java b/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/AdminClientIT.java
index 9e00424..878d931 100644
--- a/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/AdminClientIT.java
+++ b/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/AdminClientIT.java
@@ -9,14 +9,18 @@
import com.google.common.collect.Range;
import com.yahoo.gondola.Config;
import com.yahoo.gondola.Gondola;
+import com.yahoo.gondola.RoleChangeEvent;
+import com.yahoo.gondola.container.client.ShardManagerClient;
+import com.yahoo.gondola.container.client.ZookeeperShardManagerClient;
import com.yahoo.gondola.container.impl.DirectShardManagerClient;
+import com.yahoo.gondola.container.impl.ZookeeperShardManagerServer;
import com.yahoo.gondola.container.spi.RoutingHelper;
+import com.yahoo.gondola.container.utils.ZookeeperServer;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.internal.util.reflection.Whitebox;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.io.File;
@@ -27,101 +31,168 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.when;
+import javax.ws.rs.container.ContainerRequestContext;
+
+import static com.yahoo.gondola.container.AdminClientIT.Type.DIRECT;
+import static com.yahoo.gondola.container.AdminClientIT.Type.ZOOKEEPER;
import static org.testng.Assert.assertEquals;
public class AdminClientIT {
+ public static final String SERVICE_NAME = "foo";
+ public static final String CLIENT_NAME = "admin";
URL configFileURL = AdminClientIT.class.getClassLoader().getResource("gondola.conf");
Config config = new Config(new File(configFileURL.getFile()));
-
-
- @Mock
- RoutingHelper routingHelper;
-
- @Mock
- ShardManagerProvider shardManagerProvider;
-
- @Mock
- ShardManagerServer shardManagerServer;
+ ZookeeperServer zookeeperServer = new ZookeeperServer();
// shardId -> hostId
- Map routingTable = new ConcurrentHashMap<>();
+ Map routingTable;
// hostId -> LocalTestRoutingServer
- Map addressTable = new HashMap<>();
- Map latches = new HashMap<>();
- DirectShardManagerClient shardManagerClient;
- List gondolas = new ArrayList<>();
+ Map addressTable;
+ Map latches;
+ ShardManagerClient shardManagerClient;
+ List gondolas;
AdminClient adminClient;
+ List shardManagerServers;
- @BeforeMethod
- public void setUp() throws Exception {
- shardManagerClient = new DirectShardManagerClient(config);
- adminClient = new AdminClient("fooService", shardManagerClient, config);
- MockitoAnnotations.initMocks(this);
- when(routingHelper.getBucketId(any())).thenReturn(1);
- when(shardManagerProvider.getShardManagerServer()).thenReturn(shardManagerServer);
- // shardId -> memberId
- // hostId -> baseUri
+ @Mock
+ ConfigWriter configWriter;
- for (String shardId : config.getShardIds()) {
- latches.put(shardId, new CountDownLatch(1));
- }
+ enum Type {DIRECT, ZOOKEEPER}
+ public void setUp(Type type) throws Exception {
+ MockitoAnnotations.initMocks(this);
+ routingTable = new ConcurrentHashMap<>();
+ addressTable = new HashMap<>();
+ latches = new HashMap<>();
+ shardManagerClient = getShardManagerClient(type, CLIENT_NAME);
+ config.getShardIds().forEach(shardId -> latches.put(shardId, new CountDownLatch(1)));
+ gondolas = new ArrayList<>();
+ shardManagerServers = new ArrayList<>();
for (String hostId : config.getHostIds()) {
Gondola gondola = new Gondola(config, hostId);
- gondola.registerForRoleChanges(roleChangeEvent -> {
- if (roleChangeEvent.leader != null) {
- latches.get(roleChangeEvent.shard.getShardId()).countDown();
- routingTable.put(roleChangeEvent.shard.getShardId(), routingTable
- .put(roleChangeEvent.shard.getShardId(),
- config.getMember(roleChangeEvent.leader.getMemberId()).getHostId()));
- }
- });
+ gondola.registerForRoleChanges(getRoleChangeEventListener());
gondola.start();
gondolas.add(gondola);
- LocalTestRoutingServer
- testServer =
- new LocalTestRoutingServer(gondola, routingHelper, new ProxyClientProvider(),
- shardManagerProvider);
+ LocalTestRoutingServer testServer = getLocalTestRoutingServer(gondola);
+ ShardManager
+ shardManager =
+ new ShardManager(gondola, testServer.routingFilter, config, getShardManagerClient(type, hostId));
+ getShardManagerServer(type, gondola, shardManager);
addressTable.put(hostId, testServer);
+ }
+ for (Map.Entry e : latches.entrySet()) {
+ e.getValue().await();
+ }
+ adminClient = new AdminClient(SERVICE_NAME, shardManagerClient, config, configWriter);
+ }
+
+ private Consumer getRoleChangeEventListener() {
+ return roleChangeEvent -> {
+ if (roleChangeEvent.leader != null) {
+ latches.get(roleChangeEvent.shard.getShardId()).countDown();
+ routingTable.put(roleChangeEvent.shard.getShardId(),
+ config.getMember(roleChangeEvent.leader.getMemberId()).getHostId());
+ }
+ };
+ }
+
+ private LocalTestRoutingServer getLocalTestRoutingServer(final Gondola gondola) throws Exception {
+ return new LocalTestRoutingServer(gondola, new RoutingHelper() {
+ @Override
+ public int getBucketId(ContainerRequestContext request) {
+ return 1;
+ }
- // inject shardManager instance
- for (Config.ConfigMember m : config.getMembersInHost(hostId)) {
- shardManagerClient
- .setShardManager(m.getMemberId(),
- new ShardManager(gondola, testServer.routingFilter, config, shardManagerClient));
+ @Override
+ public String getSiteId(ContainerRequestContext request) {
+ return "gq1";
}
+
+ }, new ProxyClientProvider());
+ }
+
+ private ShardManagerServer getShardManagerServer(Type type, Gondola gondola,
+ ShardManager shardManager) {
+ switch (type) {
+ case DIRECT:
+ for (Config.ConfigMember m : config.getMembersInHost(gondola.getHostId())) {
+ ((DirectShardManagerClient) shardManagerClient)
+ .setShardManager(m.getMemberId(), shardManager);
+ }
+ case ZOOKEEPER:
+ ZookeeperShardManagerServer shardManagerServer =
+ new ZookeeperShardManagerServer(SERVICE_NAME, zookeeperServer.getConnectString(), gondola);
+ shardManagerServer.setShardManager(shardManager);
+ shardManagerServers.add(shardManagerServer);
+ return shardManagerServer;
}
+ return null;
+ }
- for (Map.Entry e : latches.entrySet()) {
- e.getValue().await();
+ private ShardManagerClient getShardManagerClient(Type type, String clientName) {
+ ShardManagerClient shardManagerClient = null;
+ switch (type) {
+ case DIRECT:
+ if (this.shardManagerClient == null) {
+ this.shardManagerClient = new DirectShardManagerClient(config);
+ }
+ shardManagerClient = this.shardManagerClient;
+ break;
+ case ZOOKEEPER:
+ shardManagerClient =
+ new ZookeeperShardManagerClient(SERVICE_NAME, clientName, zookeeperServer.getConnectString(),
+ config);
+ break;
}
+ return shardManagerClient;
+ }
+
+ @DataProvider(name = "typeProvider")
+ public Object[][] typeProvider() {
+ return new Object[][]{
+ {ZOOKEEPER},
+ {DIRECT}
+ };
}
- @AfterMethod
public void tearDown() throws Exception {
gondolas.parallelStream().forEach(Gondola::stop);
+ zookeeperServer.reset();
+ shardManagerServers.forEach(ShardManagerServer::stop);
+ shardManagerClient = null;
+ }
+
+ @Test(invocationCount = 1)
+ public void testAssignBuckets_direct() throws Exception {
+ testAssignBucketByType(DIRECT);
+ }
+
+ @Test(invocationCount = 1)
+ public void testAssignBuckets_zookeeper() throws Exception {
+ testAssignBucketByType(ZOOKEEPER);
}
- @Test
- public void testAssignBuckets() throws Exception {
+
+ private void testAssignBucketByType(Type type) throws Exception {
+ setUp(type);
for (BucketManager bucketManager : getBucketManagersFromAllHosts()) {
assertEquals(bucketManager.lookupBucketTable(0).shardId, "shard1");
assertEquals(bucketManager.lookupBucketTable(0).migratingShardId, null);
}
- adminClient.assignBuckets(Range.closed(0, 10), "shard1", "shard2");
- assertEquals(getBucketManagerInLeader("shard1").lookupBucketTable(0).shardId, "shard1");
- assertEquals(getBucketManagerInLeader("shard1").lookupBucketTable(0).migratingShardId, "shard2");
- adminClient.closeAssignBuckets(Range.closed(0, 10), "shard1", "shard2");
- for (BucketManager bucketManager : getBucketManagersFromAllHosts()) {
- assertEquals(bucketManager.lookupBucketTable(0).shardId, "shard2");
- assertEquals(bucketManager.lookupBucketTable(0).migratingShardId, null);
+ try {
+ adminClient.assignBuckets(Range.closed(0, 10), "shard1", "shard2");
+ for (BucketManager bucketManager : getBucketManagersFromAllHosts()) {
+ assertEquals(bucketManager.lookupBucketTable(0).shardId, "shard2");
+ assertEquals(bucketManager.lookupBucketTable(0).migratingShardId, null);
+ }
+ } finally {
+ tearDown();
}
}
@@ -130,31 +201,6 @@ private List getBucketManagersFromAllHosts() {
.collect(Collectors.toList());
}
- private BucketManager getBucketManagerInLeader(String shardId) {
- List bucketManagers = addressTable.entrySet().stream()
- .filter(e -> {
- LocalTestRoutingServer server = e.getValue();
- if (getGondola(server.routingFilter).getShard(shardId) != null
- && getGondola(server.routingFilter).getShard(shardId).getLocalMember().isLeader()) {
- return true;
- }
- return false;
- })
- .map(e -> getBucketManager(e.getValue().routingFilter))
- .collect(Collectors.toList());
- if (bucketManagers.size() == 0) {
- throw new IllegalStateException("No leader in shard " + shardId);
- } else if (bucketManagers.size() == 0) {
- throw new IllegalStateException("2 leaders in shard " + shardId);
- }
-
- return bucketManagers.get(0);
- }
-
- private Gondola getGondola(RoutingFilter routingFilter) {
- return (Gondola) Whitebox.getInternalState(routingFilter, "gondola");
- }
-
private BucketManager getBucketManager(RoutingFilter filter) {
return (BucketManager) Whitebox.getInternalState(filter, "bucketManager");
}
diff --git a/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/AdminClientTest.java b/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/AdminClientTest.java
index b53bfc6..67c8d91 100644
--- a/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/AdminClientTest.java
+++ b/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/AdminClientTest.java
@@ -30,10 +30,13 @@ public class AdminClientTest {
@Mock
ShardManagerClient shardManagerClient;
+ @Mock
+ ConfigWriter configWriter;
+
@BeforeMethod
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
- adminClient = new AdminClient("init", shardManagerClient, config);
+ adminClient = new AdminClient("init", shardManagerClient, config, configWriter);
}
@Test
@@ -46,9 +49,7 @@ public void testSetAndGetServiceName() throws Exception {
@Test
public void testGetAndSetConfig() throws Exception {
- assertEquals(adminClient.getConfig(), config);
- adminClient.setConfig(null);
- assertEquals(adminClient.getConfig(), null);
+ // TODO: implement
}
@Test
diff --git a/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/LocalTestRoutingServer.java b/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/LocalTestRoutingServer.java
index e0069e3..93eeecd 100644
--- a/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/LocalTestRoutingServer.java
+++ b/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/LocalTestRoutingServer.java
@@ -33,9 +33,9 @@ public class LocalTestRoutingServer {
HttpHost host;
RoutingFilter routingFilter;
- public LocalTestRoutingServer(Gondola gondola, RoutingHelper routingHelper, ProxyClientProvider proxyClientProvider,
- ShardManagerProvider shardManagerProvider) throws Exception {
- routingFilter = new RoutingFilter(gondola, routingHelper, proxyClientProvider, null);
+ public LocalTestRoutingServer(Gondola gondola, RoutingHelper routingHelper, ProxyClientProvider proxyClientProvider) throws Exception {
+ routingFilter = new RoutingFilter(gondola, routingHelper, proxyClientProvider, null,
+ null);
localTestServer = new LocalTestServer((request, response, context) -> {
try {
URI requestUri = URI.create(request.getRequestLine().getUri());
diff --git a/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/LocalTestRoutingServerTest.java b/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/LocalTestRoutingServerTest.java
index 939b83e..c283cc4 100644
--- a/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/LocalTestRoutingServerTest.java
+++ b/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/LocalTestRoutingServerTest.java
@@ -95,7 +95,7 @@ public void setUp() throws Exception {
@Test
public void testRoutingServer() throws Exception {
- server = new LocalTestRoutingServer(gondola, routingHelper, proxyClientProvider, shardManagerProvider);
+ server = new LocalTestRoutingServer(gondola, routingHelper, proxyClientProvider);
CloseableHttpClient client = HttpClients.createDefault();
CloseableHttpResponse response = client.execute(new HttpGet(server.getHostUri()));
diff --git a/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/LockManagerTest.java b/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/LockManagerTest.java
index 1086df2..4e50fd9 100644
--- a/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/LockManagerTest.java
+++ b/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/LockManagerTest.java
@@ -8,6 +8,7 @@
import com.google.common.collect.Range;
import com.yahoo.gondola.Config;
+import com.yahoo.gondola.Gondola;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
@@ -30,11 +31,15 @@ public class LockManagerTest {
@Mock
Config config;
+ @Mock
+ Gondola gondola;
+
@BeforeMethod
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
+ when(gondola.getConfig()).thenReturn(config);
when(config.getBoolean(eq("tracing.router"))).thenReturn(false);
- lockManager = new LockManager(config);
+ lockManager = new LockManager(gondola);
}
diff --git a/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/RoutingFilterTest.java b/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/RoutingFilterTest.java
index 4b26317..e59f0c2 100644
--- a/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/RoutingFilterTest.java
+++ b/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/RoutingFilterTest.java
@@ -98,6 +98,13 @@ public class RoutingFilterTest {
@Mock
Map bucketRequestCounters;
+
+ @Mock
+ ChangeLogProcessor changeLogProcessor;
+
+ @Mock
+ RoutingService routingService;
+
MultivaluedHashMap headersMap = new MultivaluedHashMap<>();
@BeforeMethod
@@ -114,7 +121,7 @@ public void setUp() throws Exception {
when(request.getHeaders()).thenReturn(headersMap);
when(request.getRequestUri()).thenReturn(URI.create(MY_APP_URI));
- router = new RoutingFilter(gondola, routingHelper, proxyClientProvider, null);
+ router = new RoutingFilter(gondola, routingHelper, proxyClientProvider, changeLogProcessor, routingService);
}
private static String getResourceFile(String file) {
@@ -184,7 +191,7 @@ public void testBecomeLeader_block_shard() throws Exception {
RoleChangeEvent event = new RoleChangeEvent(shard, member, member, null, null);
consumer.getValue().accept(event);
Thread.sleep(1000);
- verify(routingHelper, times(1)).beforeServing(any());
+// verify(routingHelper, times(1)).beforeServing(any());
}
@Test
diff --git a/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/ShardManagerTest.java b/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/ShardManagerTest.java
index 9655f5b..4278d0c 100644
--- a/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/ShardManagerTest.java
+++ b/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/ShardManagerTest.java
@@ -27,6 +27,7 @@
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
@@ -80,8 +81,7 @@ public void setUp() throws Exception {
@Test
public void testStartObserving_success() throws Exception {
- Member.SlaveStatus status = new Member.SlaveStatus();
- status.running = true;
+ Member.SlaveStatus status = getSuccessSlaveStatus();
when(member.getSlaveStatus()).thenReturn(status);
assertFalse(getObservedShards().contains(TARGET_SHARD));
shardManager.startObserving(FROM_SHARD, TARGET_SHARD, 300);
@@ -90,6 +90,14 @@ public void testStartObserving_success() throws Exception {
assertTrue(getObservedShards().contains(TARGET_SHARD));
}
+ private Member.SlaveStatus getSuccessSlaveStatus() {
+ Member.SlaveStatus status = new Member.SlaveStatus();
+ status.running = true;
+ status.commitIndex = 100;
+ status.masterId = 86;
+ return status;
+ }
+
@Test(expectedExceptions = ShardManagerProtocol.ShardManagerException.class)
public void testStartObserving_failed() throws Exception {
Member.SlaveStatus status = new Member.SlaveStatus();
@@ -102,15 +110,14 @@ public void testStartObserving_failed() throws Exception {
@Test
public void testStopObserving() throws Exception {
- Member.SlaveStatus status = new Member.SlaveStatus();
- status.running = true;
+ Member.SlaveStatus status = getSuccessSlaveStatus();
when(member.getSlaveStatus()).thenReturn(status);
// Start observing
shardManager.startObserving(FROM_SHARD, TARGET_SHARD, 300);
assertTrue(getObservedShards().contains(TARGET_SHARD));
// Stop observing successfully
- when(member.getSlaveStatus()).thenReturn(null);
+ when(member.getSlaveStatus()).thenReturn(status).thenReturn(null);
shardManager.stopObserving(FROM_SHARD, TARGET_SHARD, 300);
assertFalse(getObservedShards().contains(TARGET_SHARD));
@@ -125,7 +132,8 @@ public void testAssignBucket_success() throws Exception {
Range r = Range.closed(1, 2);
shardManager.migrateBuckets(r, FROM_SHARD, TARGET_SHARD, TIMEOUT_MS);
verify(filter, times(1)).updateBucketRange(any(), any(), any(), anyBoolean());
- verify(shardManagerClient, times(1)).setBuckets(any(), any(), any(), anyBoolean());
+ verify(shardManagerClient, times(1)).waitSlavesSynced(any(), anyLong());
+ verify(shardManagerClient, times(1)).stopObserving(any(), any(), anyLong());
}
@Test
@@ -147,21 +155,27 @@ private Set getObservedShards() {
public void testWaitSlavesSynced() throws Exception {
when(shard.getCommitIndex()).thenReturn(2);
when(shard.getLastSavedIndex()).thenReturn(1).thenReturn(2);
- assertTrue(shardManager.waitSlavesSynced("shard1", 300));
+ when(shard.getMember(anyInt())).thenReturn(member);
+ when(member.getSlaveStatus()).thenReturn(getSuccessSlaveStatus());
+ assertTrue(shardManager.waitSlavesSynced(FROM_SHARD, 300));
}
@Test
public void testWaitSlavesSynced_failed_never_catched_up() throws Exception {
when(shard.getCommitIndex()).thenReturn(1+1);
when(shard.getLastSavedIndex()).thenReturn(1);
- assertFalse(shardManager.waitSlavesSynced("shard1", 300));
+ when(shard.getMember(anyInt())).thenReturn(member);
+ when(member.getSlaveStatus()).thenReturn(getSuccessSlaveStatus());
+ assertFalse(shardManager.waitSlavesSynced(FROM_SHARD, 300));
}
@Test
public void testWaitSlavesApproaching() throws Exception {
when(shard.getCommitIndex()).thenReturn(2);
when(shard.getLastSavedIndex()).thenReturn(1).thenReturn(2);
- assertTrue(shardManager.waitSlavesApproaching("shard1", 300));
+ when(shard.getMember(anyInt())).thenReturn(member);
+ when(member.getSlaveStatus()).thenReturn(getSuccessSlaveStatus());
+ assertTrue(shardManager.waitSlavesApproaching(FROM_SHARD, 300));
}
@@ -169,7 +183,9 @@ public void testWaitSlavesApproaching() throws Exception {
public void testWaitSlavesSynced_failed_never_approaching() throws Exception {
when(shard.getCommitIndex()).thenReturn(1 + ShardManager.LOG_APPROACHING_DIFF + 1);
when(shard.getLastSavedIndex()).thenReturn(1);
- assertFalse(shardManager.waitSlavesSynced("shard1", 300));
+ when(shard.getMember(anyInt())).thenReturn(member);
+ when(member.getSlaveStatus()).thenReturn(getSuccessSlaveStatus());
+ assertFalse(shardManager.waitSlavesSynced(FROM_SHARD, 300));
}
}
diff --git a/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/client/ZookeeperShardManagerClientTest.java b/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/client/ZookeeperShardManagerClientTest.java
index 0a6ae95..f928cdb 100644
--- a/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/client/ZookeeperShardManagerClientTest.java
+++ b/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/client/ZookeeperShardManagerClientTest.java
@@ -6,121 +6,132 @@
package com.yahoo.gondola.container.client;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Range;
import com.yahoo.gondola.Config;
import com.yahoo.gondola.Gondola;
import com.yahoo.gondola.container.ShardManager;
import com.yahoo.gondola.container.impl.ZookeeperShardManagerServer;
+import com.yahoo.gondola.container.utils.ZookeeperServer;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.test.TestingServer;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.utils.CloseableUtils;
import org.mockito.MockitoAnnotations;
+import org.mockito.internal.util.reflection.Whitebox;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.io.File;
import java.net.URL;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertTrue;
public class ZookeeperShardManagerClientTest {
- TestingServer testingServer = null;
ZookeeperShardManagerClient client;
+ ZookeeperServer zookeeperServer = new ZookeeperServer();
Map servers;
Map shardManagers;
- CuratorFramework curatorFramework = null;
+ ObjectMapper objectMapper = new ObjectMapper();
URL configUrl = ZookeeperShardManagerClientTest.class.getClassLoader().getResource("gondola.conf");
Config config = new Config(new File(configUrl.getFile()));
- List gondolas;
+ PathChildrenCache stats;
@BeforeMethod
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
- if (testingServer == null) {
- testingServer = new TestingServer();
- testingServer.start();
- curatorFramework = CuratorFrameworkFactory.newClient(testingServer.getConnectString(), new RetryOneTime(1000));
- curatorFramework.start();
- }
servers = new HashMap<>();
shardManagers = new HashMap<>();
- gondolas = new ArrayList<>();
for (String hostId : config.getHostIds()) {
- Gondola gondola = new Gondola(config, hostId);
- gondola.start();
- ZookeeperShardManagerServer
- server =
- new ZookeeperShardManagerServer("foo", testingServer.getConnectString(), gondola);
+ Gondola gondola = mock(Gondola.class);
+ when(gondola.getHostId()).thenReturn(hostId);
+ when(gondola.getConfig()).thenReturn(config);
+ ZookeeperShardManagerServer server =
+ new ZookeeperShardManagerServer("foo", zookeeperServer.getConnectString(), gondola);
ShardManager shardManager = mock(ShardManager.class);
server.setShardManager(shardManager);
shardManagers.put(hostId, shardManager);
servers.put(hostId, server);
- gondolas.add(gondola);
}
- client = new ZookeeperShardManagerClient("foo", testingServer.getConnectString(), config);
+ client = new ZookeeperShardManagerClient("foo", "fooClientName", zookeeperServer.getConnectString(), config);
+ stats = (PathChildrenCache) Whitebox.getInternalState(client, "stats");
+ CountDownLatch latch = new CountDownLatch(1);
+ this.stats.getListenable().addListener((curatorFramework, pathChildrenCacheEvent) -> {
+ if (this.stats.getCurrentData().size() == config.getMembers().size()) {
+ latch.countDown();
+ }
+ });
+ latch.await();
}
@AfterMethod
public void tearDown() throws Exception {
servers.forEach((s, server) -> server.stop());
- gondolas.parallelStream().forEach(Gondola::stop);
- for (String path : curatorFramework.getChildren().forPath("/")) {
- if (!path.equals("zookeeper")) {
- curatorFramework.delete().deletingChildrenIfNeeded().forPath("/" + path);
- }
- }
+ CloseableUtils.closeQuietly(stats);
+ client.stop();
+ zookeeperServer.reset();
}
@Test
public void testStartObserving() throws Exception {
client.startObserving("shard2", "shard1", 1000);
- Thread.sleep(100);
-
- for (Config.ConfigMember m : config.getMembersInShard("shard2")) {
- ShardManager shardManager = shardManagers.get(m.getHostId());
+ List memberIds =
+ config.getMembersInShard("shard2").stream().map(Config.ConfigMember::getMemberId)
+ .collect(Collectors.toList());
+ for (Integer memberId : memberIds) {
+ ShardManager shardManager = shardManagers.get(config.getMember(memberId).getHostId());
verify(shardManager).startObserving(any(), any(), anyLong());
}
- for (Config.ConfigMember m : config.getMembers()) {
- if (!m.getShardId().equals("shard2")) {
- ShardManager shardManager = shardManagers.get(m.getHostId());
- verify(shardManager, times(0)).startObserving(any(), any(), anyLong());
+ for (ChildData d : stats.getCurrentData()) {
+ ZookeeperStat stat = objectMapper.readValue(d.getData(), ZookeeperStat.class);
+ if (!memberIds.contains(stat.memberId)) {
+ continue;
}
+ assertTrue(stat.isSlaveOperational());
}
}
@Test
public void testStopObserving() throws Exception {
+ client.startObserving("shard2", "shard1", 1000);
client.stopObserving("shard2", "shard1", 1000);
- Thread.sleep(100);
+ List memberIds =
+ config.getMembersInShard("shard2").stream().map(Config.ConfigMember::getMemberId)
+ .collect(Collectors.toList());
for (Config.ConfigMember m : config.getMembersInShard("shard2")) {
ShardManager shardManager = shardManagers.get(m.getHostId());
verify(shardManager).stopObserving(any(), any(), anyLong());
}
+
+ for (ChildData d : stats.getCurrentData()) {
+ ZookeeperStat stat = objectMapper.readValue(d.getData(), ZookeeperStat.class);
+ if (!memberIds.contains(stat.memberId)) {
+ continue;
+ }
+ assertTrue(stat.isNormalOperational());
+ }
}
@Test
public void testMigrateBuckets() throws Exception {
client.migrateBuckets(Range.closed(0, 10), "shard1", "shard2", 1000);
- Thread.sleep(100);
for (Config.ConfigMember m : config.getMembersInShard("shard1")) {
ShardManager shardManager = shardManagers.get(m.getHostId());
verify(shardManager).migrateBuckets(any(), any(), any(), anyLong());
@@ -161,15 +172,8 @@ public void testWaitSlavesApproaching_successs_on_synched() throws Exception {
@Test
public void testSetBuckets() throws Exception {
client.setBuckets(Range.closed(0, 10), "shard1", "shard2", false);
- Thread.sleep(100);
for (String hostId : config.getHostIds()) {
- verify(shardManagers.get(hostId)).setBuckets(any(), any(), any(), anyBoolean());
+ verify(shardManagers.get(hostId)).setBuckets(any(), any(),any(), anyBoolean());
}
}
-
- @Test
- public void testWaitBucketsCondition() throws Exception {
- client.setBuckets(Range.closed(0, 10), "shard1", "shard2", false);
- assertTrue(client.waitBucketsCondition(Range.closed(0, 10), "shard1", "shard2", 1000));
- }
}
\ No newline at end of file
diff --git a/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/impl/DirectShardManagerClient.java b/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/impl/DirectShardManagerClient.java
index 3058116..edf3b49 100644
--- a/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/impl/DirectShardManagerClient.java
+++ b/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/impl/DirectShardManagerClient.java
@@ -20,8 +20,6 @@
import java.util.function.Function;
import java.util.stream.Collectors;
-import static com.yahoo.gondola.container.ShardManagerProtocol.ShardManagerException.CODE.NOT_LEADER;
-
/**
* The Direct shard manager client. Will be only used in test.
*/
@@ -91,17 +89,6 @@ public void setBuckets(Range splitRange, String fromShardId, String toS
}
}
- @Override
- public boolean waitBucketsCondition(Range range, String fromShardId, String toShardId, long timeoutMs)
- throws InterruptedException {
- for (Config.ConfigMember m : config.getMembers()) {
- if (!getShardManager(m.getMemberId()).waitBucketsCondition(range, fromShardId, toShardId, 3000)) {
- return false;
- }
- }
- return true;
- }
-
private Function getWaitApproachingFunction(String shardId, long timeoutMs) {
return memberId -> {
try {
@@ -139,13 +126,10 @@ public void migrateBuckets(Range splitRange, String fromShardId,
String toShardId, long timeoutMs)
throws ShardManagerException {
for (Config.ConfigMember m : config.getMembersInShard(fromShardId)) {
- try {
- getShardManager(m.getMemberId()).migrateBuckets(splitRange, fromShardId, toShardId, timeoutMs);
- } catch (ShardManagerException e) {
- if (e.errorCode != NOT_LEADER) {
- throw e;
- }
- }
+ getShardManager(m.getMemberId()).migrateBuckets(splitRange, fromShardId, toShardId, timeoutMs);
+ }
+ for (Config.ConfigMember m : config.getMembers()) {
+ getShardManager(m.getMemberId()).setBuckets(splitRange, fromShardId, toShardId, true);
}
}
@@ -167,4 +151,8 @@ private void trace(String format, Object... args) {
}
}
+ @Override
+ public void stop() {
+
+ }
}
diff --git a/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/impl/ZookeeperConfigProviderTest.java b/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/impl/ZookeeperConfigProviderTest.java
new file mode 100644
index 0000000..fd3eddd
--- /dev/null
+++ b/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/impl/ZookeeperConfigProviderTest.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2015, Yahoo Inc.
+ * Copyrights licensed under the New BSD License.
+ * See the accompanying LICENSE file for terms.
+ */
+
+package com.yahoo.gondola.container.impl;
+
+import com.yahoo.gondola.container.client.ZookeeperUtils;
+import com.yahoo.gondola.container.utils.ZookeeperServer;
+
+import org.apache.commons.io.IOUtils;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Arrays;
+
+import static org.testng.Assert.assertTrue;
+
+public class ZookeeperConfigProviderTest {
+
+ ZookeeperServer server;
+ ZookeeperConfigProvider configProvider;
+ URL file = ZookeeperConfigProviderTest.class.getClassLoader().getResource("gondola.conf");
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ if (server == null) {
+ server = new ZookeeperServer();
+ server.getClient().create().creatingParentContainersIfNeeded().forPath(ZookeeperUtils.configPath("fooService"), IOUtils.toByteArray(file));
+ }
+ configProvider = new ZookeeperConfigProvider(server.getClient(), "fooService");
+ }
+
+ @AfterMethod
+ public void tearDown() throws Exception {
+ configProvider.stop();
+ }
+
+ @Test
+ public void testGetConfigFile() throws Exception {
+ File configFile = configProvider.getConfigFile();
+ assertTrue(Arrays.equals(IOUtils.toByteArray(configFile.toURI()), IOUtils.toByteArray(file)));
+ }
+
+ // TODO: fix race condition
+// @Test
+// public void testStop() throws Exception {
+// configProvider.stop();
+// }
+//
+// @Test
+// public void testSaveConfigFile() throws Exception {
+// configProvider.saveConfigFile(new File(file.getFile()));
+// }
+}
\ No newline at end of file
diff --git a/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/utils/ZookeeperServer.java b/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/utils/ZookeeperServer.java
new file mode 100644
index 0000000..60c683c
--- /dev/null
+++ b/containers/jersey2-routing/src/test/java/com/yahoo/gondola/container/utils/ZookeeperServer.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2015, Yahoo Inc.
+ * Copyrights licensed under the New BSD License.
+ * See the accompanying LICENSE file for terms.
+ */
+
+package com.yahoo.gondola.container.utils;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.TestingServer;
+
+public class ZookeeperServer {
+ private TestingServer testingServer;
+ private CuratorFramework client = null;
+ public ZookeeperServer() {
+ try {
+ testingServer = new TestingServer();
+ testingServer.start();
+ client = CuratorFrameworkFactory.newClient(testingServer.getConnectString(), new RetryOneTime(1000));
+ client.start();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ public void reset() throws Exception {
+ try {
+ for (String path : client.getChildren().forPath("/")) {
+ if (!path.equals("zookeeper")) {
+ client.delete().deletingChildrenIfNeeded().forPath("/" + path);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String getConnectString() {
+ return testingServer.getConnectString();
+ }
+
+ public CuratorFramework getClient() {
+ return client;
+ }
+}
diff --git a/containers/jersey2-routing/src/test/resources/gondola.conf b/containers/jersey2-routing/src/test/resources/gondola.conf
index 5107413..893626e 100644
--- a/containers/jersey2-routing/src/test/resources/gondola.conf
+++ b/containers/jersey2-routing/src/test/resources/gondola.conf
@@ -9,9 +9,6 @@ gondola {
{ hostId = "host4", siteId = "gq1", hostname = "localhost", port = "2831", appScheme = "http", appPort = "8083"},
{ hostId = "host5", siteId = "bf1", hostname = "localhost", port = "2832", appScheme = "http", appPort = "8084"},
{ hostId = "host6", siteId = "bf2", hostname = "localhost", port = "2833", appScheme = "http", appPort = "8085"},
- { hostId = "host7", siteId = "gq1", hostname = "localhost", port = "2834", appScheme = "http", appPort = "8086"},
- { hostId = "host8", siteId = "bf1", hostname = "localhost", port = "2835", appScheme = "http", appPort = "8087"},
- { hostId = "host9", siteId = "bf2", hostname = "localhost", port = "2836", appScheme = "http", appPort = "8088"},
]
shards = [
{
@@ -29,14 +26,6 @@ gondola {
{hostId: "host5", memberId: "85"},
{hostId: "host4", memberId: "84"},
],
- },
- {
- shardId = "shard3", bucketMap = "",
- hosts = [
- {hostId: "host7", memberId: "87"},
- {hostId: "host8", memberId: "88"},
- {hostId: "host9", memberId: "89"},
- ],
}
],
sites = [
diff --git a/containers/pom.xml b/containers/pom.xml
index 36ea226..91e7ed8 100644
--- a/containers/pom.xml
+++ b/containers/pom.xml
@@ -47,7 +47,6 @@
com.yahoo.gondola
core
- ${project.version}
diff --git a/containers/registration/pom.xml b/containers/registration/pom.xml
index 96b1205..9f74e05 100644
--- a/containers/registration/pom.xml
+++ b/containers/registration/pom.xml
@@ -12,9 +12,6 @@
groupId
registry
-
- 2.8.0
-
com.yahoo.gondola.containers
@@ -26,33 +23,27 @@
org.apache.curator
curator-framework
- ${curator.version}
org.apache.curator
curator-recipes
- ${curator.version}
org.apache.curator
curator-test
- ${curator.version}
test
com.fasterxml.jackson.core
jackson-databind
- 2.6.0
org.apache.curator
curator-client
- ${curator.version}
javax.validation
validation-api
- 1.1.0.Final
\ No newline at end of file
diff --git a/containers/registration/src/test/java/com/yahoo/gondola/container/ZookeeperRegistryClientTest.java b/containers/registration/src/test/java/com/yahoo/gondola/container/ZookeeperRegistryClientTest.java
index bcaa253..e30565d 100644
--- a/containers/registration/src/test/java/com/yahoo/gondola/container/ZookeeperRegistryClientTest.java
+++ b/containers/registration/src/test/java/com/yahoo/gondola/container/ZookeeperRegistryClientTest.java
@@ -30,6 +30,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -155,10 +156,17 @@ public void testAddListener() throws Exception {
ArgumentCaptor
args =
ArgumentCaptor.forClass(RegistryClient.Entry.class);
- String hostId = registryClient.register(SITE_1_HOST_1_CLUSTER, new InetSocketAddress(1234),
+
+ InetSocketAddress addr = new InetSocketAddress(1234);
+ CountDownLatch latch = new CountDownLatch(1);
+ registryClient.addListener(entry1 -> {
+ if (entry1.gondolaAddress.equals(addr)) {
+ latch.countDown();
+ }
+ });
+ String hostId = registryClient.register(SITE_1_HOST_1_CLUSTER, addr,
URI.create("https://api1.yahoo.com:4443"));
- // wait for zookeeper events
- Thread.sleep(100);
+ latch.await();
verify(listener, times(1)).accept(args.capture());
RegistryClient.Entry entry = args.getValue();
assertEquals(entry.hostId, hostId);
@@ -167,13 +175,13 @@ public void testAddListener() throws Exception {
@Test
public void testGetRegistries_local() throws Exception {
- testGetEntries(registryClient, registryClient, 0);
+ testGetEntries(registryClient, registryClient, false);
}
@Test
public void testGetRegistries_remote() throws Exception {
ZookeeperRegistryClient reader = new ZookeeperRegistryClient(client, objectMapper, config);
- testGetEntries(registryClient, reader, 100);
+ testGetEntries(registryClient, reader, true);
}
@@ -201,12 +209,26 @@ public void testWaitForClusterComplete() throws Exception {
assertEquals(result.get(), Boolean.TRUE);
}
- private void testGetEntries(RegistryClient writer, RegistryClient reader, int sleep) throws Exception {
+ private void testGetEntries(RegistryClient writer, RegistryClient reader, boolean remote) throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ InetSocketAddress addr = new InetSocketAddress(1234);
+ if (remote) {
+ reader.addListener(entry -> {
+ if (entry.gondolaAddress.equals(addr)) {
+ latch.countDown();
+ }
+ });
+ }
+
String
hostId =
- writer.register(SITE_1_HOST_3_CLUSTERS, new InetSocketAddress(1234),
+ writer.register(SITE_1_HOST_3_CLUSTERS, addr,
URI.create("https://api1.yahoo.com:4443"));
+ if (remote) {
+ latch.await();
+ }
+
List writerEntries = writer.getEntries().entrySet().stream()
.map(Map.Entry::getValue)
.filter(e -> e.hostId.equals(hostId))
@@ -214,9 +236,6 @@ private void testGetEntries(RegistryClient writer, RegistryClient reader, int sl
assertEquals(writerEntries.size(), 1);
- if (sleep != 0) {
- Thread.sleep(sleep);
- }
Map readerEntries = reader.getEntries();
assertEquals(readerEntries.size(), 1);
diff --git a/core/pom.xml b/core/pom.xml
index 71b7d0d..67e09b8 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -13,26 +13,10 @@
0.2.8-SNAPSHOT
4.0.0
- com.yahoo.gondola
core
gondola-core
Java implementation of Raft
jar
-
- UTF-8
- UTF-8
- false
- Gondola
- Gondola
- wcpan@yahoo-inc.com
- false
- 1.8
- 1.8
- 1.8
- true
- 2.16
- false
-
https://github.com/yahoo/gondola
@@ -61,34 +45,28 @@
com.typesafe
config
- 1.2.1
mysql
mysql-connector-java
- 5.1.35
com.h2database
h2
- 1.4.187
com.zaxxer
HikariCP
- 2.4.2
org.apache.commons
commons-exec
- 1.3
test
commons-cli
commons-cli
- 1.3
test
@@ -100,7 +78,7 @@
org.codehaus.mojo
appassembler-maven-plugin
- 1.9
+ ${appassembler-maven-plugin.version}
prepare-package
diff --git a/core/src/main/java/com/yahoo/gondola/Config.java b/core/src/main/java/com/yahoo/gondola/Config.java
index 6948d2d..c602460 100644
--- a/core/src/main/java/com/yahoo/gondola/Config.java
+++ b/core/src/main/java/com/yahoo/gondola/Config.java
@@ -399,4 +399,27 @@ public void run() {
}
}
}
+
+ /**
+ * Provider interface to get config file.
+ */
+ public interface ConfigProvider {
+ void saveConfigFile(File configFile);
+ File getConfigFile();
+ void stop();
+ }
+
+ /**
+ * Factory method to create config instance by a config provider.
+ *
+ * @param provider
+ * @return
+ */
+ public static Config getConfigInstance(ConfigProvider provider) {
+ return new Config(provider.getConfigFile());
+ }
+
+ public File getFile() {
+ return file;
+ }
}
diff --git a/examples/kv-server/README.md b/examples/kv-server/README.md
index 1396e34..420aa95 100644
--- a/examples/kv-server/README.md
+++ b/examples/kv-server/README.md
@@ -4,7 +4,7 @@ This demo application implements a fault-tolerant hash table using a
Raft cluster of three nodes. The demo starts up three servers, each
implementing a RESTful API for setting and getting values. The demo
uses H2DB as the backing storage for the Raft log so that the updates
-are persistant.
+are persistent.
## API
@@ -84,7 +84,7 @@ port: 8082 :
```
## Failover Test
-The folloing commands will demo the scenario:
+The following commands will demo the scenario:
- host1 server down
- host2 or host3 take over the leader
- send write command to the new leader
diff --git a/examples/kv-server/bin/run b/examples/kv-server/bin/run
index c5d90f5..adff7ee 100755
--- a/examples/kv-server/bin/run
+++ b/examples/kv-server/bin/run
@@ -20,7 +20,7 @@ fi
export hostId
export JAVA_TOOL_OPTIONS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=$debugPort -XX:+UnlockCommercialFeatures -XX:+FlightRecorder"
-mvn jetty:run -Djetty.port=$port &
+mvn -DskipTests -Dcheckstyle.skip -Djacoco.skip jetty:run -Djetty.port=$port &
pid=$!
trap "echo 'Stopping java process...'; kill -9 $pid; wait" SIGINT SIGTERM
wait
diff --git a/examples/kv-server/pom.xml b/examples/kv-server/pom.xml
index 2f09528..e485e07 100644
--- a/examples/kv-server/pom.xml
+++ b/examples/kv-server/pom.xml
@@ -29,17 +29,14 @@
com.yahoo.gondola.containers
jersey2-routing
- ${project.version}
org.slf4j
slf4j-log4j12
- 1.7.12
log4j
log4j
- 1.2.17
diff --git a/examples/kv-server/src/main/java/com/yahoo/gondola/demo/DemoApplication.java b/examples/kv-server/src/main/java/com/yahoo/gondola/demo/DemoApplication.java
index af8eeac..2e29d68 100644
--- a/examples/kv-server/src/main/java/com/yahoo/gondola/demo/DemoApplication.java
+++ b/examples/kv-server/src/main/java/com/yahoo/gondola/demo/DemoApplication.java
@@ -6,20 +6,12 @@
package com.yahoo.gondola.demo;
-import com.yahoo.gondola.Config;
-import com.yahoo.gondola.Gondola;
-import com.yahoo.gondola.RoleChangeEvent;
import com.yahoo.gondola.container.RoutingFilter;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.server.ResourceConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.net.URL;
-import java.util.function.Consumer;
+import java.net.URI;
import javax.servlet.ServletContext;
import javax.ws.rs.core.Context;
@@ -31,64 +23,28 @@
* callback and register RoutingFilter as Jersey filter 4. Register the resources
*/
public class DemoApplication extends ResourceConfig {
+ DemoService demoService;
- static Logger logger = LoggerFactory.getLogger(DemoApplication.class);
- Gondola gondola;
+ public DemoApplication(@Context ServletContext servletContext) throws Exception {
- static String hostId = System.getenv("hostId") != null ? System.getenv("hostId") : "host1";
+ // Initialize Routing application
+ RoutingFilter routingFilter = RoutingFilter.Builder.createRoutingFilter()
+ .setConfigUri(URI.create("classpath:///gondola.conf"))
+ .setService(DemoService.class)
+ .build();
- public DemoApplication(@Context ServletContext servletContext) throws Exception {
- gondola = initializeGondola();
+ // Register routing application
+ register(routingFilter);
// Dependency injection to DemoResource
- DemoService demoService = new DemoService(gondola);
register(new AbstractBinder() {
@Override
protected void configure() {
- bind(demoService).to(DemoService.class);
+ bind((DemoService) routingFilter.getService()).to(DemoService.class);
}
});
- // Register routing filter
- register(RoutingFilter.Builder.createRoutingFilter()
- .setApplication(this)
- .setRoutingHelper(new DemoRoutingHelper(gondola, demoService))
- .setGondola(gondola)
- .build());
-
// register resource
register(DemoResources.class);
}
-
- private Gondola initializeGondola() throws Exception {
- // Find the config file
- URL gondolaConfURI = DemoApplication.class.getClassLoader().getResource("gondola.conf");
- if (gondolaConfURI == null) {
- throw new FileNotFoundException(String.format("Gondola configuration '%s' not found", "gondola.conf"));
- }
-
- // Create the gondola instance
- File gondolaConf = new File(gondolaConfURI.getFile());
- Config config = new Config(gondolaConf);
- Gondola gondola = new Gondola(config, hostId);
-
- // Register for role updates and start gondola
- logger.info("Current role: FOLLOWER");
- Consumer listener = crevt -> {
- switch (crevt.newRole) {
- case CANDIDATE:
- logger.info("[{}] Current role: CANDIDATE", gondola.getHostId());
- break;
- case LEADER:
- logger.info("[{}] Current role: LEADER", gondola.getHostId());
- break;
- case FOLLOWER:
- logger.info("[{}] Current role: FOLLOWER", gondola.getHostId());
- break;
- }
- };
- gondola.registerForRoleChanges(listener);
- gondola.start();
- return gondola;
- }
}
diff --git a/examples/kv-server/src/main/java/com/yahoo/gondola/demo/DemoResources.java b/examples/kv-server/src/main/java/com/yahoo/gondola/demo/DemoResources.java
index ebcab5f..9396918 100644
--- a/examples/kv-server/src/main/java/com/yahoo/gondola/demo/DemoResources.java
+++ b/examples/kv-server/src/main/java/com/yahoo/gondola/demo/DemoResources.java
@@ -6,26 +6,36 @@
package com.yahoo.gondola.demo;
-import javax.inject.Inject;
-import javax.ws.rs.*;
-import javax.ws.rs.core.Response;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.ServerErrorException;
+import javax.ws.rs.ServiceUnavailableException;
+import javax.ws.rs.container.ContainerRequestContext;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.Response;
+
/**
* Endpoint and resource definition for the kv-server service.
*/
@Path("/entries/{key}")
public class DemoResources {
+
Logger logger = LoggerFactory.getLogger(DemoResources.class);
- @Inject DemoService service;
+ @Inject
+ DemoService service;
@GET
- public String getEntry(@PathParam("key") String key) {
+ public String getEntry(@PathParam("key") String key, @Context ContainerRequestContext request) {
try {
- return service.getValue(key);
+ return service.getValue(key, request);
} catch (DemoService.NotLeaderException e) {
throw new ServiceUnavailableException();
} catch (DemoService.NotFoundException e) {
@@ -34,9 +44,9 @@ public String getEntry(@PathParam("key") String key) {
}
@PUT
- public void putEntry(String value, @PathParam("key") String key) {
+ public void putEntry(String value, @PathParam("key") String key, @Context ContainerRequestContext request) {
try {
- service.putValue(key, value);
+ service.putValue(key, value, request);
} catch (DemoService.NotLeaderException e) {
throw new ServiceUnavailableException();
} catch (Throwable t) {
diff --git a/examples/kv-server/src/main/java/com/yahoo/gondola/demo/DemoRoutingHelper.java b/examples/kv-server/src/main/java/com/yahoo/gondola/demo/DemoRoutingHelper.java
index e53c938..28919dc 100644
--- a/examples/kv-server/src/main/java/com/yahoo/gondola/demo/DemoRoutingHelper.java
+++ b/examples/kv-server/src/main/java/com/yahoo/gondola/demo/DemoRoutingHelper.java
@@ -6,9 +6,7 @@
package com.yahoo.gondola.demo;
-import com.yahoo.gondola.Shard;
import com.yahoo.gondola.Config;
-import com.yahoo.gondola.Gondola;
import com.yahoo.gondola.container.spi.RoutingHelper;
import java.util.HashSet;
@@ -16,7 +14,6 @@
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import java.util.stream.Collectors;
import javax.ws.rs.container.ContainerRequestContext;
@@ -25,16 +22,16 @@
*/
public class DemoRoutingHelper implements RoutingHelper {
- Gondola gondola;
+ String hostId;
+ Config config;
int numberOfShards;
List shardIdList;
- DemoService demoService;
- public DemoRoutingHelper(Gondola gondola, DemoService demoService) {
- this.gondola = gondola;
- this.demoService = demoService;
- loadNumberOfShards(gondola);
- loadShardIdList(gondola);
+ public DemoRoutingHelper(String hostId, Config config) {
+ this.hostId = hostId;
+ this.config = config;
+ loadNumberOfShards();
+ loadShardIdList();
}
@Override
@@ -43,24 +40,12 @@ public int getBucketId(ContainerRequestContext request) {
return hashValue % numberOfShards;
}
- @Override
- public int getAppliedIndex(String shardId) {
- return demoService.getAppliedIndex();
- }
-
@Override
public String getSiteId(ContainerRequestContext request) {
- return gondola.getConfig().getAttributesForHost(gondola.getHostId()).get("siteId");
+ return config.getAttributesForHost(hostId).get("siteId");
}
- @Override
- public void beforeServing(String shardId) {
- demoService.beforeServing();
- }
-
- private void loadNumberOfShards(Gondola gondola) {
- Config config = gondola.getConfig();
-
+ private void loadNumberOfShards() {
Set shardIds = new HashSet<>();
for (String hostId : config.getHostIds()) {
shardIds.addAll(config.getShardIds(hostId));
@@ -68,10 +53,8 @@ private void loadNumberOfShards(Gondola gondola) {
numberOfShards = shardIds.size();
}
- private void loadShardIdList(Gondola gondola) {
- shardIdList = gondola.getShardsOnHost().stream()
- .map(Shard::getShardId)
- .collect(Collectors.toList());
+ private void loadShardIdList() {
+ shardIdList = config.getShardIds(hostId);
}
/**
diff --git a/examples/kv-server/src/main/java/com/yahoo/gondola/demo/DemoService.java b/examples/kv-server/src/main/java/com/yahoo/gondola/demo/DemoService.java
index 6f3c12e..8e1640e 100644
--- a/examples/kv-server/src/main/java/com/yahoo/gondola/demo/DemoService.java
+++ b/examples/kv-server/src/main/java/com/yahoo/gondola/demo/DemoService.java
@@ -6,77 +6,96 @@
package com.yahoo.gondola.demo;
-import com.yahoo.gondola.Shard;
-import com.yahoo.gondola.Command;
import com.yahoo.gondola.Gondola;
-import com.yahoo.gondola.Role;
+import com.yahoo.gondola.RoleChangeEvent;
+import com.yahoo.gondola.container.ChangeLogProcessor;
+import com.yahoo.gondola.container.RoutingService;
+import com.yahoo.gondola.container.spi.RoutingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
+import javax.ws.rs.container.ContainerRequestContext;
+
/**
* The core business logic of demo service.
*/
-public class DemoService {
- Logger logger = LoggerFactory.getLogger(DemoService.class);
-
- Gondola gondola;
-
- // The map holding all the entries
- Map entries = new ConcurrentHashMap<>();
+public class DemoService extends RoutingService {
- // Gondola shard, used for replication
- Shard shard;
+ private static Logger logger = LoggerFactory.getLogger(DemoService.class);
+ private Map entries = new ConcurrentHashMap<>();
+ private String hostId;
+ private DemoRoutingHelper demoRoutingHelper;
- ChangeLogProcessor clProcessor;
+ /**
+ * Instantiates a new Routing service.
+ *
+ * @param gondola the gondola
+ */
+ public DemoService(Gondola gondola) {
+ super(gondola);
+ hostId = gondola.getHostId();
+ demoRoutingHelper = new DemoRoutingHelper(gondola.getHostId(), gondola.getConfig());
+ gondola.registerForRoleChanges(listener);
+ }
- public DemoService(Gondola gondola) throws Exception {
- this.gondola = gondola;
- shard = gondola.getShardsOnHost().get(0);
+ Consumer listener = crevt -> {
+ switch (crevt.newRole) {
+ case CANDIDATE:
+ logger.info("[{}] Current role: CANDIDATE", gondola.getHostId());
+ break;
+ case LEADER:
+ logger.info("[{}] Current role: LEADER", gondola.getHostId());
+ break;
+ case FOLLOWER:
+ logger.info("[{}] Current role: FOLLOWER", gondola.getHostId());
+ break;
+ }
+ };
- clProcessor = new ChangeLogProcessor();
- clProcessor.start();
- }
/**
* Returns the value stored at the specified key.
*
- * @param key
+ * @param key the key
+ * @param servletRequest the servlet request
* @return The non-null value of the key
- * @throws NotFoundException
+ * @throws NotLeaderException the not leader exception
+ * @throws NotFoundException the not found exception
*/
- public String getValue(String key) throws NotFoundException, NotLeaderException {
- if (shard.getLocalRole() != Role.LEADER) {
+ public String getValue(String key, ContainerRequestContext servletRequest)
+ throws NotLeaderException, NotFoundException {
+ if (!isLeader(getShardId(servletRequest))) {
throw new NotLeaderException();
}
if (!entries.containsKey(key)) {
throw new NotFoundException();
}
String value = entries.get(key);
- logger.info(String.format("[%s] Get key %s: %s", gondola.getHostId(), key, value));
+ logger.info(String.format("[%s] Get key %s: %s", this.hostId, key, value));
return value;
}
/**
* Commits the entry to Raft log. The entries map is not updated; it is updated by the Replicator thread.
- * @param key The non-null key
- * @param value The non-null value
+ *
+ * @param key The non-null key
+ * @param value The non-null value
+ * @param servletRequest the servlet request
+ * @throws NotLeaderException the not leader exception
*/
- public void putValue(String key, String value) throws NotLeaderException {
+ public void putValue(String key, String value, ContainerRequestContext servletRequest) throws NotLeaderException {
if (key.indexOf(" ") >= 0) {
throw new IllegalArgumentException("The key must not contain spaces");
}
try {
- Command command = shard.checkoutCommand();
byte[] bytes = (key + " " + value).getBytes(); // TODO implement better separator
- command.commit(bytes, 0, bytes.length);
- logger.info(String.format("[%s] Put key %s=%s", gondola.getHostId(), key, value));
+ writeLog(getShardId(servletRequest), bytes);
+ logger.info(String.format("[%s] Put key %s=%s", hostId, key, value));
} catch (com.yahoo.gondola.NotLeaderException e) {
logger.info(String.format("Failed to put %s/%s because not a leader", key, value));
} catch (InterruptedException e) {
@@ -84,66 +103,37 @@ public void putValue(String key, String value) throws NotLeaderException {
}
}
- /**
- * Returns the applied index.
- *
- * @return applied index.
- */
- public int getAppliedIndex() {
- return clProcessor.getAppliedIndex();
- }
-
- /**
- * Background thread that continuously reads committed commands from the Gondola shard, and updates the entries
- * map. TODO: prevent reads until the map is fully updated.
- */
- public class ChangeLogProcessor extends Thread {
- int appliedIndex = 0;
- List> listeners = new ArrayList<>();
-
- @Override
- public void run() {
- String string;
- while (true) {
- try {
- string = shard.getCommittedCommand(appliedIndex + 1).getString();
- appliedIndex++;
- logger.info("[{}] Executing command {}: {}", gondola.getHostId(),
- appliedIndex, string);
- String[] pair = string.split(" ", 2);
- if (pair.length == 2) {
- entries.put(pair[0], pair[1]);
- }
- } catch (Throwable e) {
- logger.error(e.getMessage(), e);
- break;
- }
+ @Override
+ public ChangeLogProcessor.ChangeLogConsumer provideChangeLogConsumer() {
+ return (shardId, command) -> {
+ String[] pair = command.getString().split(" ", 2);
+ if (pair.length == 2) {
+ entries.put(pair[0], pair[1]);
}
- }
+ };
+ }
- public int getAppliedIndex() {
- return appliedIndex;
- }
+ @Override
+ public RoutingHelper provideRoutingHelper() {
+ return demoRoutingHelper;
}
- /**
- * Called after all changes from the Raft log have been applied and before requests
- * are accepted.
- */
- public void beforeServing() {
- // In this demo application, there's no need to update internal state
- logger.info("[{}] Ready", gondola.getHostId());
+ @Override
+ public void ready(String shardId) {
+ logger.info("[{}] {} ready for serving", gondola.getHostId(), shardId);
}
/**
- * Thrown when the key for a GET request does not exist.
+ * The type Not leader exception.
*/
- public static class NotFoundException extends Throwable {
+ public class NotLeaderException extends Exception {
+
}
/**
- * Thrown when the current node is not the leader.
+ * The type Not found exception.
*/
- public static class NotLeaderException extends Throwable {
+ public class NotFoundException extends Exception {
+
}
}
diff --git a/pom.xml b/pom.xml
index b330713..4062e1d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,13 +33,49 @@
https://github.com/yahoo/gondola
- 2.15
- 6.1.1
+ UTF-8
+ UTF-8
+ 1.8
+ 1.8
+ 1.8
+ 6.12.1
settings/checkstyle.xml
settings/checkstyle-suppressions.xml
settings/java.header
- 1.7.10
+ 1.7.13
UTF-8
+ 3.0.0
+ 1.2.17
+ 1.10.19
+ 1.2.1
+ 5.1.37
+ 1.4.190
+ 2.4.2
+ 1.3
+ 1.3.1
+ 4.5.1
+ 2.4
+ 0.1.53
+ 0.0.9
+ 18.0
+ 2.22.1
+ 7.0
+ 2.6.3
+ 1.1.0.Final
+ 0.0.9
+ 6.9.9
+ 2.17
+ 2.4
+ 3.3
+ 1.9.4
+ 2.19
+ 2.8.2
+ 1.9
+ 4.1.0
+ 0.7.5.201505241946
+ 3.0.3
+ 3.5
+ 2.19
@@ -71,6 +107,176 @@
Yahoo! Inc
https://www.yahoo.com/
+
+
+
+
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+
+
+
+ org.slf4j
+ slf4j-log4j12
+ ${slf4j.version}
+
+
+ log4j
+ log4j
+ ${log4j.version}
+
+
+ org.testng
+ testng
+ ${testng.version}
+
+
+ org.mockito
+ mockito-all
+ ${mockito.version}
+
+
+ yahoo.yinst.gondola
+ gondola
+ ${project.version}
+
+
+ com.typesafe
+ config
+ ${config.version}
+
+
+ mysql
+ mysql-connector-java
+ ${mysql-connector-java.version}
+
+
+ com.h2database
+ h2
+ ${h2.version}
+
+
+ com.zaxxer
+ HikariCP
+ ${HikariCP.version}
+
+
+
+ org.apache.commons
+ commons-exec
+ ${commons-exec.version}
+
+
+ commons-cli
+ commons-cli
+ ${commons-cli.version}
+
+
+ org.apache.httpcomponents
+ httpclient
+ ${httpclient.version}
+
+
+
+ org.slf4j
+ jcl-over-slf4j
+ ${slf4j.version}
+
+
+ org.apache.httpcomponents
+ httpclient
+ ${httpclient.version}
+ tests
+
+
+ commons-io
+ commons-io
+ ${commons-io.version}
+
+
+ com.jcraft
+ jsch
+ ${jsch.version}
+
+
+ com.jcraft
+ jsch.agentproxy.usocket-jna
+ ${jsch.agentproxy.usocket-jna.version}
+
+
+ com.jcraft
+ jsch.agentproxy.sshagent
+ ${jsch.agentproxy.sshagent.version}
+
+
+ com.jcraft
+ jsch.agentproxy.connector-factory
+ ${jsch.agentproxy.sshagent.version}
+
+
+ com.jcraft
+ jsch.agentproxy.jsch
+ ${jsch.agentproxy.sshagent.version}
+
+
+ com.google.guava
+ guava
+ ${guava.version}
+
+
+ org.glassfish.jersey.containers
+ jersey-container-servlet
+ ${jersey-container-servlet.version}
+
+
+ javax
+ javaee-api
+ ${javaee-api.version}
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ ${jackson-databind.version}
+
+
+ com.yahoo.gondola.containers
+ jersey2-routing
+ ${project.version}
+
+
+ com.yahoo.gondola
+ core
+ ${project.version}
+
+
+ org.apache.curator
+ curator-framework
+ ${curator.version}
+
+
+ org.apache.curator
+ curator-recipes
+ ${curator.version}
+
+
+ org.apache.curator
+ curator-test
+ ${curator.version}
+ test
+
+
+ org.apache.curator
+ curator-client
+ ${curator.version}
+
+
+ javax.validation
+ validation-api
+ ${validation-api.version}
+
+
+
@@ -92,7 +298,7 @@
maven-source-plugin
- 2.3
+ ${maven-source-plugin.version}
attach-sources
@@ -105,10 +311,10 @@
org.apache.maven.plugins
maven-compiler-plugin
- 3.2
+ ${maven-compiler-plugin.version}
-
- 1.8
+
+ ${target_jdk_version}
@@ -122,6 +328,7 @@
false
**\/*
+ false
**\/*
*.png
@@ -155,13 +362,13 @@
org.apache.maven.scm
maven-scm-provider-gitexe
- 1.9.4
+ ${maven-scm-provider-gitexe.version}
maven-surefire-plugin
- 2.19
+ ${maven-surefire-plugin.version}
@@ -173,8 +380,7 @@
maven-failsafe-plugin
- 2.18.1
-
maven-deploy-plugin
- 2.8.2
+ ${maven-deploy-plugin.version}
org.eluder.coveralls
coveralls-maven-plugin
- 4.0.0
+ ${coveralls-maven-plugin.version}
${env.coveralls_repo_token}
@@ -201,7 +406,7 @@
org.jacoco
jacoco-maven-plugin
- 0.7.5.201505241946
+ ${jacoco-maven-plugin.version}
prepare-agent
@@ -219,12 +424,12 @@
org.codehaus.mojo
findbugs-maven-plugin
- 3.0.3
+ ${findbugs-maven-plugin.version}
org.apache.maven.plugins
maven-pmd-plugin
- 3.5
+ ${maven-pmd-plugin.version}
@@ -233,31 +438,26 @@
org.slf4j
slf4j-api
- ${slf4j.version}
org.slf4j
slf4j-log4j12
- ${slf4j.version}
test
log4j
log4j
- 1.2.17
test
org.testng
testng
- 6.8.21
test
org.mockito
mockito-all
- 1.10.8
test