Skip to content
This repository has been archived by the owner on Dec 5, 2018. It is now read-only.

Hookup gondola setSlave and zookeeper shard manager server/client implementation #60

Merged
merged 9 commits into from Dec 2, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 0 additions & 16 deletions containers/jersey2-routing/pom.xml
Expand Up @@ -45,86 +45,70 @@
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.1</version>
</dependency>
<!-- bridge apache common log to slf4j -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>1.7.13</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.1</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>0.1.53</version>
</dependency>
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch.agentproxy.usocket-jna</artifactId>
<version>0.0.9</version>
</dependency>
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch.agentproxy.sshagent</artifactId>
<version>0.0.9</version>
</dependency>
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch.agentproxy.connector-factory</artifactId>
<version>0.0.6</version>
</dependency>
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch.agentproxy.jsch</artifactId>
<version>0.0.7</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.containers</groupId>
<artifactId>jersey-container-servlet</artifactId>
<version>2.22.1</version>
</dependency>
<dependency>
<groupId>javax</groupId>
<artifactId>javaee-api</artifactId>
<version>7.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>2.8.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.8.0</version>
</dependency>
</dependencies>
</project>
Expand Up @@ -14,10 +14,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.List;
import java.util.Map;

import static com.yahoo.gondola.container.ShardManagerProtocol.ShardManagerException.CODE.*;
import static com.yahoo.gondola.container.ShardManagerProtocol.ShardManagerException.CODE.SLAVE_NOT_SYNC;

/**
* The type Admin client.
Expand All @@ -26,10 +27,11 @@ public class AdminClient {

// TODO: move to config
public static final int RETRY_COUNT = 3;
public static final int TIMEOUT_MS = 300;
public static final int TIMEOUT_MS = 3000;
private String serviceName;
private Config config;
private ShardManagerClient shardManagerClient;
private ConfigWriter configWriter;

private static Logger logger = LoggerFactory.getLogger(AdminClient.class);
private boolean tracing = false;
Expand All @@ -41,10 +43,12 @@ public class AdminClient {
* @param shardManagerClient the shard manager client
* @param config the config
*/
public AdminClient(String serviceName, ShardManagerClient shardManagerClient, Config config) {
public AdminClient(String serviceName, ShardManagerClient shardManagerClient, Config config,
ConfigWriter configWriter) {
this.serviceName = serviceName;
this.shardManagerClient = shardManagerClient;
this.config = config;
this.configWriter = configWriter;
this.config.registerForUpdates(config1 -> {
tracing = config1.getBoolean("tracing.adminCli");
});
Expand Down Expand Up @@ -87,11 +91,10 @@ public Config getConfig() throws AdminException {
/**
* Sets config.
*
* @param config the config
* @param configFile the config file.
* @throws AdminException the admin exception
*/
public void setConfig(Config config) throws AdminException {
this.config = config;
public void setConfig(File configFile) throws AdminException {
}


Expand All @@ -116,33 +119,35 @@ public void splitShard(String fromShardId, String toShardId) throws AdminExcepti
*/
public void assignBuckets(Range<Integer> range, String fromShardId, String toShardId)
throws InterruptedException, AdminException {
trace("Executing assign buckets={} from {} to {}", range, fromShardId, toShardId);
trace("[admin] Executing assign buckets={} from {} to {}", range, fromShardId, toShardId);
for (int i = 1; i <= RETRY_COUNT; i++) {
try {
trace("Initializing slaves on {} ...", toShardId);
trace("[admin] Initializing slaves on {} ...", toShardId);
shardManagerClient.startObserving(toShardId, fromShardId, TIMEOUT_MS);

trace(
"All nodes in {} are in slave mode, waiting for slave logs approaching to leader's log position.",
"[admin] All nodes in {} are in slave mode, "
+ "waiting for slave logs approaching to leader's log position.",
toShardId);

if (!shardManagerClient.waitSlavesApproaching(toShardId, -1)) {
throw new ShardManagerException(SLAVE_NOT_SYNC);
}

trace("All nodes in {} logs approached to leader's log position, assigning buckets={} ...", toShardId,
range);
trace("[admin] All nodes in {} logs approached to leader's log position, assigning buckets={} ...",
toShardId, range);
// migrateBuckets is a atomic operation executing on leader at fromShard,
// after operation is success, it will stop observing mode of toShard.
shardManagerClient.migrateBuckets(range, fromShardId, toShardId, 2000);

trace("Assign buckets complete, assigned buckets={} from {} to {}", range, fromShardId, toShardId);
shardManagerClient.migrateBuckets(range, fromShardId, toShardId, TIMEOUT_MS);
trace("[admin] success!");
trace("[admin] Writing latest config to config storage!");
saveConfig(fromShardId, toShardId);
break;
} catch (ShardManagerException e) {
try {
shardManagerClient.stopObserving(fromShardId, toShardId, TIMEOUT_MS);
shardManagerClient.stopObserving(toShardId, fromShardId, TIMEOUT_MS);
} catch (ShardManagerException e1) {
logger.info("Rollback, Stop observing failed, ignoring the error.");
logger.info("Rollback, Stop observing failed, ignoring the error. msg={}", e1.getMessage());
}
if (i != RETRY_COUNT) {
logger.warn("Error occurred during assign buckets.. retrying {} / {}, errorMsg={}",
Expand All @@ -155,22 +160,15 @@ public void assignBuckets(Range<Integer> range, String fromShardId, String toSha
}
}

/**
* Close assign buckets.
*
* @param fromShardId the from shard id
* @param toShardId the to shard id
* @param range the range
*/
public void closeAssignBuckets(Range<Integer> range, String fromShardId, String toShardId)
throws ShardManagerException, InterruptedException {
trace("Executing close the state of assign buckets");

trace("Waiting all nodes bucket table updated...");
shardManagerClient.waitBucketsCondition(range, fromShardId, toShardId, 3000);
trace("closing the state of assign buckets...");
shardManagerClient.setBuckets(range, fromShardId, toShardId, true);
trace("Done!");
private void saveConfig(String fromShardId, String toShardId) throws AdminException {
configWriter.setBucketMap(fromShardId, getBucketMapString(fromShardId));
configWriter.setBucketMap(fromShardId, getBucketMapString(toShardId));
setConfig(configWriter.save());
}

private String getBucketMapString(String fromShardId) {
// TODO: implement
return "";
}

private void trace(String format, Object... args) {
Expand Down Expand Up @@ -288,6 +286,7 @@ class ShardStat extends Stat {
}

class AdminException extends Exception {

ErrorCode errorCode;


Expand All @@ -303,6 +302,7 @@ public AdminException() {
enum ErrorCode {
CONFIG_NOT_FOUND(10000);
private int code;

ErrorCode(int code) {
this.code = code;
}
Expand Down
@@ -0,0 +1,127 @@
/*
* Copyright 2015, Yahoo Inc.
* Copyrights licensed under the New BSD License.
* See the accompanying LICENSE file for terms.
*/

package com.yahoo.gondola.container;

import com.yahoo.gondola.Command;
import com.yahoo.gondola.Gondola;
import com.yahoo.gondola.Shard;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

/**
* The type Change log processor.
*/
public class ChangeLogProcessor {

private Gondola gondola;
private Map<String, ChangeLogProcessorThread> threads = new HashMap<>();
private static Logger logger = LoggerFactory.getLogger(ChangeLogProcessor.class);
private ChangeLogConsumer changeLogConsumer;


/**
* Instantiates a new Change log processor.
* @param gondola the gondola
* @param changeLogConsumer change log consumer provided by app.
*/
public ChangeLogProcessor(Gondola gondola, ChangeLogConsumer changeLogConsumer) {
this.gondola = gondola;
this.changeLogConsumer = changeLogConsumer;
for (String shardId : gondola.getConfig().getShardIds(gondola.getHostId())) {
createThread(shardId);
}
}

/**
* The type Change log processor thread.
*/
class ChangeLogProcessorThread extends Thread {

int appliedIndex = 0;
private int retryCount = 0;
private Shard shard;
private String shardId;
private String hostId;
private int memberId;

public ChangeLogProcessorThread(String shardId) {
setName("ChangeLogProcessor");
this.shardId = shardId;
this.hostId = gondola.getHostId();
// TODO: get memberId
}

public void run() {
Command command;
shard = gondola.getShard(shardId);
while (true) {
try {
command = shard.getCommittedCommand(appliedIndex + 1);
if (changeLogConsumer != null) {
changeLogConsumer.apply(shardId, command);
}
appliedIndex++;
} catch (InterruptedException e) {
logger.warn("[{}-{}] ChangeLogProcessor interrupted, exit..", hostId, memberId);
return;
} catch (Throwable e) {
logger.error(e.getMessage(), e);
if (++retryCount == 3) {
logger
.error("[{}-{}] Max retry count reached, exit..", hostId, memberId);
return;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
logger.warn("[{}-{}] ChangeLogProcessor interrupted, exit..", hostId, memberId);
return;
}
}
}
}
}

private void createThread(String shardId) {
ChangeLogProcessorThread thread = new ChangeLogProcessorThread(shardId);
threads.put(shardId, thread);
}

/**
* Gets applied index.
*
* @param shardId the shard id
* @return the applied index
*/
public int getAppliedIndex(String shardId) {
return threads.get(shardId).appliedIndex;
}

/**
* Stop.
*/
public void stop() {
com.yahoo.gondola.core.Utils.stopThreads(new ArrayList<>(threads.values()));
}

public void start() {
threads.values().forEach(ChangeLogProcessorThread::start);
}

/**
* Change log consumer functional interface.
*/
@FunctionalInterface
public interface ChangeLogConsumer {
void apply(String shardId, Command command);
}
}