Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GOBBLIN-1107: Lazily initialize Helix TaskStateModelFactory in Gobbli… #2947

Closed
wants to merge 9 commits into from
Expand Up @@ -120,47 +120,35 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
public static final String CLUSTER_APP_WORK_DIR = GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_PREFIX + "appWorkDir";

private static final Logger logger = LoggerFactory.getLogger(GobblinTaskRunner.class);
static final java.nio.file.Path CLUSTER_CONF_PATH = Paths.get("generated-gobblin-cluster.conf");

static final java.nio.file.Path CLUSTER_CONF_PATH = Paths.get("generated-gobblin-cluster.conf");
sv2000 marked this conversation as resolved.
Show resolved Hide resolved
static final String GOBBLIN_TASK_FACTORY_NAME = "GobblinTaskFactory";

static final String GOBBLIN_JOB_FACTORY_NAME = "GobblinJobFactory";

private final String helixInstanceName;

private final String clusterName;
private final Optional<ContainerMetrics> containerMetrics;
private final List<Service> services = Lists.newArrayList();
private final Path appWorkPath;

@Getter
private HelixManager jobHelixManager;

private Optional<HelixManager> taskDriverHelixManager = Optional.absent();

private final ServiceManager serviceManager;

private final TaskStateModelFactory taskStateModelFactory;

private final Optional<ContainerMetrics> containerMetrics;

protected final String taskRunnerId;

private ServiceManager serviceManager;
private TaskStateModelFactory taskStateModelFactory;
private boolean isTaskDriver;
private boolean dedicatedTaskDriverCluster;
private Collection<StandardMetricsBridge.StandardMetrics> metricsCollection;
private volatile boolean stopInProgress = false;

private volatile boolean isStopped = false;

protected final String taskRunnerId;
protected final EventBus eventBus = new EventBus(GobblinTaskRunner.class.getSimpleName());

protected final Config clusterConfig;

@Getter
protected final FileSystem fs;
private final List<Service> services = Lists.newArrayList();
protected final String applicationName;
protected final String applicationId;
private final Path appWorkPath;
private boolean isTaskDriver;
private boolean dedicatedTaskDriverCluster;

private final Collection<StandardMetricsBridge.StandardMetrics> metricsCollection;

public GobblinTaskRunner(String applicationName,
String helixInstanceName,
Expand Down Expand Up @@ -191,6 +179,17 @@ public GobblinTaskRunner(String applicationName,

this.containerMetrics = buildContainerMetrics();

logger.info("GobblinTaskRunner({}): applicationName {}, helixInstanceName {}, applicationId {}, taskRunnerId {}, config {}, appWorkDir {}",
this.isTaskDriver? "taskDriver" : "worker",
applicationName,
helixInstanceName,
applicationId,
taskRunnerId,
config,
appWorkDirOptional);
}

private synchronized TaskRunnerSuiteBase initTaskStateModelFactory() throws ReflectiveOperationException {
sv2000 marked this conversation as resolved.
Show resolved Hide resolved
String builderStr = ConfigUtils.getString(this.clusterConfig,
GobblinClusterConfigurationKeys.TASK_RUNNER_SUITE_BUILDER,
TaskRunnerSuiteBase.Builder.class.getName());
Expand All @@ -203,8 +202,8 @@ public GobblinTaskRunner(String applicationName,
}

TaskRunnerSuiteBase.Builder builder = GobblinConstructorUtils.<TaskRunnerSuiteBase.Builder>invokeLongestConstructor(
new ClassAliasResolver(TaskRunnerSuiteBase.Builder.class)
.resolveClass(builderStr), this.clusterConfig);
new ClassAliasResolver(TaskRunnerSuiteBase.Builder.class)
.resolveClass(builderStr), this.clusterConfig);

TaskRunnerSuiteBase suite = builder.setAppWorkPath(this.appWorkPath)
.setContainerMetrics(this.containerMetrics)
Expand All @@ -218,25 +217,7 @@ public GobblinTaskRunner(String applicationName,
.build();

this.taskStateModelFactory = createTaskStateModelFactory(suite.getTaskFactoryMap());
this.metricsCollection = suite.getMetricsCollection();
this.services.addAll(suite.getServices());

this.services.addAll(getServices());

if (this.services.isEmpty()) {
this.serviceManager = null;
} else {
this.serviceManager = new ServiceManager(services);
}

logger.info("GobblinTaskRunner({}): applicationName {}, helixInstanceName {}, applicationId {}, taskRunnerId {}, config {}, appWorkDir {}",
this.isTaskDriver?"taskDriver" : "worker",
applicationName,
helixInstanceName,
applicationId,
taskRunnerId,
config,
appWorkDirOptional);
return suite;
}

private Path initAppWorkDir(Config config, Optional<Path> appWorkDirOptional) {
Expand Down Expand Up @@ -304,6 +285,24 @@ public void start() {

connectHelixManagerWithRetry();

TaskRunnerSuiteBase suite;
try {
suite = initTaskStateModelFactory();
} catch (Exception e) {
throw new RuntimeException(e);
}

this.metricsCollection = suite.getMetricsCollection();
this.services.addAll(suite.getServices());

this.services.addAll(getServices());

if (this.services.isEmpty()) {
this.serviceManager = null;
} else {
this.serviceManager = new ServiceManager(services);
}

addInstanceTags();

// Start metric reporting
Expand Down
Expand Up @@ -19,11 +19,17 @@

import java.io.IOException;
import java.net.URL;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.curator.test.TestingServer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.manager.zk.ZkClient;
import org.slf4j.Logger;
Expand All @@ -35,10 +41,13 @@

import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;

import org.apache.gobblin.cluster.suite.IntegrationBasicSuite;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.testing.AssertWithBackoff;


Expand Down Expand Up @@ -66,6 +75,7 @@ public class GobblinTaskRunnerTest {
private GobblinTaskRunner corruptGobblinTaskRunner;
private String clusterName;
private String corruptHelixInstance;
private TaskAssignmentAfterConnectionRetry suite;

@BeforeClass
public void setUp() throws Exception {
Expand Down Expand Up @@ -110,6 +120,9 @@ public void setUp() throws Exception {

@Test
public void testSendReceiveShutdownMessage() throws Exception {
ExecutorService service = Executors.newSingleThreadExecutor();
service.submit(() -> GobblinTaskRunnerTest.this.gobblinTaskRunner.start());

Logger log = LoggerFactory.getLogger("testSendReceiveShutdownMessage");
this.gobblinClusterManager.sendShutdownRequest();

Expand All @@ -128,21 +141,31 @@ public void testBuildFileSystemConfig() {
Assert.assertEquals(fileSystem.getConf().get(HADOOP_OVERRIDE_PROPERTY_NAME), "value");
}

@Test
public void testConnectHelixManagerWithRetry() {
/**
* A helper method that creates a partial instance structure in ZK.
*/
private static void createPartialInstanceStructure(HelixManager helixManager, String zkConnectString) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be moved into utils class newly created ?

//Connect and disconnect the corrupt task runner to create a Helix Instance set up.
sv2000 marked this conversation as resolved.
Show resolved Hide resolved
try {
this.corruptGobblinTaskRunner.connectHelixManager();
this.corruptGobblinTaskRunner.disconnectHelixManager();
helixManager.connect();
helixManager.disconnect();
} catch (Exception e) {
Assert.fail("Failed to connect to ZK");
}

//Delete ERRORS/HISTORY/STATUSUPDATES znodes under INSTANCES to simulate partial instance set up.
ZkClient zkClient = new ZkClient(testingZKServer.getConnectString());
zkClient.delete(PropertyPathBuilder.instanceError(clusterName, corruptHelixInstance));
zkClient.delete(PropertyPathBuilder.instanceHistory(clusterName, corruptHelixInstance));
zkClient.delete(PropertyPathBuilder.instanceStatusUpdate(clusterName, corruptHelixInstance));
ZkClient zkClient = new ZkClient(zkConnectString);
zkClient.delete(PropertyPathBuilder.instanceError(helixManager.getClusterName(), helixManager.getInstanceName()));
zkClient.delete(PropertyPathBuilder.instanceHistory(helixManager.getClusterName(), helixManager.getInstanceName()));
zkClient.delete(PropertyPathBuilder.instanceStatusUpdate(helixManager.getClusterName(), helixManager.getInstanceName()));
}

@Test
public void testConnectHelixManagerWithRetry() {
HelixManager instanceManager = HelixManagerFactory.getZKHelixManager(
clusterName, corruptHelixInstance, InstanceType.PARTICIPANT, testingZKServer.getConnectString());

createPartialInstanceStructure(instanceManager, testingZKServer.getConnectString());

//Ensure that the connecting to Helix without retry will throw a HelixException
try {
Expand All @@ -156,13 +179,73 @@ public void testConnectHelixManagerWithRetry() {
//Ensure that connect with retry succeeds
corruptGobblinTaskRunner.connectHelixManagerWithRetry();
Assert.assertTrue(true);

corruptGobblinTaskRunner.disconnectHelixManager();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should corruptGobblinTaskRunner be a local variable?

}

@Test (groups = {"disabledOnTravis"})
public void testTaskAssignmentAfterHelixConnectionRetry()
throws Exception {
this.suite = new TaskAssignmentAfterConnectionRetry();

String zkConnectString = suite.getManagerConfig().getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
String clusterName = suite.getManagerConfig().getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
//A test manager instance for observing the state of the cluster
HelixManager helixManager = HelixManagerFactory.getZKHelixManager(clusterName, "TestManager", InstanceType.SPECTATOR, zkConnectString);

suite.startCluster();

helixManager.connect();

//Ensure that Helix has created a workflow
AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
assertTrue(ClusterIntegrationTest.isTaskStarted(helixManager, TaskAssignmentAfterConnectionRetry.JOB_ID), "Waiting for the job to start...");

//Ensure that the SleepingTask is running
AssertWithBackoff.create().maxSleepMs(100).timeoutMs(2000).backoffFactor(1).
assertTrue(ClusterIntegrationTest.isTaskRunning(TaskAssignmentAfterConnectionRetry.TASK_STATE_FILE),"Waiting for the task to enter running state");

helixManager.disconnect();
}


public static class TaskAssignmentAfterConnectionRetry extends IntegrationBasicSuite {
sv2000 marked this conversation as resolved.
Show resolved Hide resolved
public static final String JOB_ID = "job_testJob_" + System.currentTimeMillis();
public static final String TASK_STATE_FILE = "/tmp/" + GobblinTaskRunnerTest.class.getSimpleName() + "/taskState/_RUNNING";

@Override
protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) {
Config newConfig = ConfigFactory.parseMap(ImmutableMap.of(
ConfigurationKeys.SOURCE_CLASS_KEY, "org.apache.gobblin.cluster.SleepingCustomTaskSource",
ConfigurationKeys.JOB_ID_KEY, JOB_ID,
GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, Boolean.TRUE,
GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 10L, SleepingTask.TASK_STATE_FILE_KEY, TASK_STATE_FILE))
.withFallback(rawJobConfig);
return ImmutableMap.of(JOB_NAME, newConfig);
}

@Override
protected void createHelixCluster() throws Exception {
super.createHelixCluster();
String clusterName = super.getManagerConfig().getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
String zkConnectString = super.getManagerConfig().getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
HelixManager helixManager = HelixManagerFactory.getZKHelixManager(clusterName, IntegrationBasicSuite.WORKER_INSTANCE_0, InstanceType.PARTICIPANT, zkConnectString);

//Create a partial instance setup
GobblinTaskRunnerTest.createPartialInstanceStructure(helixManager, zkConnectString);
}
}


@AfterClass
public void tearDown() throws IOException {
public void tearDown()
throws IOException, InterruptedException {
try {
this.gobblinClusterManager.disconnectHelixManager();
this.gobblinTaskRunner.disconnectHelixManager();
if (this.suite != null) {
this.suite.shutdownCluster();
}
} finally {
this.testingZKServer.close();
}
Expand Down