Skip to content

Commit

Permalink
[GOBBLIN-336] Refactor HelixTask to create taskAttempt with a builder
Browse files Browse the repository at this point in the history
This allows the task attempt logic to be mocked
out in unit tests in the
future.

Testing:

The integration test
org.apache.gobblin.cluster.ClusterIntegrationTest
passed.

Closes #2191 from HappyRay/add-task-attempt-
builder
  • Loading branch information
HappyRay authored and abti committed Dec 8, 2017
1 parent 8c338be commit ba909f1
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 42 deletions.
Expand Up @@ -17,9 +17,6 @@

package org.apache.gobblin.cluster;

import com.google.common.io.Closer;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.runtime.util.StateStores;
import java.io.IOException;
import java.util.List;

Expand All @@ -33,29 +30,27 @@
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import com.typesafe.config.ConfigFactory;

import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.TaskExecutor;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.TaskStateTracker;
import org.apache.gobblin.runtime.util.JobMetrics;
import org.apache.gobblin.runtime.util.StateStores;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.Id;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.SerializationUtils;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;


/**
Expand All @@ -80,50 +75,35 @@ public class GobblinHelixTask implements Task {

private static final Logger LOGGER = LoggerFactory.getLogger(GobblinHelixTask.class);

@SuppressWarnings({"unused", "FieldCanBeLocal"})
private final Optional<JobMetrics> jobMetrics;
private final TaskExecutor taskExecutor;
private final TaskStateTracker taskStateTracker;

private final TaskConfig taskConfig;
// An empty JobState instance that will be filled with values read from the serialized JobState
private final JobState jobState = new JobState();
private final String jobName;
private final String jobId;
private final String jobKey;
private final String participantId;

private final FileSystem fs;
private final StateStores stateStores;
private final TaskAttemptBuilder taskAttemptBuilder;

private GobblinMultiTaskAttempt taskAttempt;

public GobblinHelixTask(TaskCallbackContext taskCallbackContext, Optional<ContainerMetrics> containerMetrics,
TaskExecutor taskExecutor, TaskStateTracker taskStateTracker, FileSystem fs, Path appWorkDir,
StateStores stateStores)
public GobblinHelixTask(TaskCallbackContext taskCallbackContext, FileSystem fs, Path appWorkDir,
TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores)
throws IOException {
this.taskExecutor = taskExecutor;
this.taskStateTracker = taskStateTracker;

this.taskConfig = taskCallbackContext.getTaskConfig();
this.stateStores = stateStores;
this.taskAttemptBuilder = taskAttemptBuilder;
this.jobName = this.taskConfig.getConfigMap().get(ConfigurationKeys.JOB_NAME_KEY);
this.jobId = this.taskConfig.getConfigMap().get(ConfigurationKeys.JOB_ID_KEY);
this.jobKey = Long.toString(Id.parse(this.jobId).getSequence());
this.participantId = taskCallbackContext.getManager().getInstanceName();

this.fs = fs;
this.stateStores = stateStores;

Path jobStateFilePath = new Path(appWorkDir, this.jobId + "." + AbstractJobLauncher.JOB_STATE_FILE_NAME);
SerializationUtils.deserializeState(this.fs, jobStateFilePath, this.jobState);

if (containerMetrics.isPresent()) {
// This must be done after the jobState is deserialized from the jobStateFilePath
// A reference to jobMetrics is required to ensure it is not evicted from the GobblinMetricsRegistry Cache
this.jobMetrics = Optional.of(JobMetrics.get(this.jobState, containerMetrics.get().getMetricContext()));
} else {
this.jobMetrics = Optional.absent();
}
}

@Override
Expand Down Expand Up @@ -162,9 +142,7 @@ public TaskResult run() {
SharedResourcesBroker<GobblinScopeTypes> jobBroker =
globalBroker.newSubscopedBuilder(new JobScopeInstance(this.jobState.getJobName(), this.jobState.getJobId())).build();

this.taskAttempt = new GobblinMultiTaskAttempt(workUnits.iterator(), this.jobId, this.jobState, this.taskStateTracker,
this.taskExecutor, Optional.of(this.participantId), Optional.of(this.stateStores.taskStateStore), jobBroker);

this.taskAttempt = this.taskAttemptBuilder.build(workUnits.iterator(), this.jobId, this.jobState, jobBroker);
this.taskAttempt.runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE);
return new TaskResult(TaskResult.Status.COMPLETED, String.format("completed tasks: %d", workUnits.size()));
} catch (InterruptedException ie) {
Expand Down
Expand Up @@ -17,28 +17,26 @@

package org.apache.gobblin.cluster;

import com.typesafe.config.Config;
import org.apache.gobblin.runtime.util.StateStores;
import java.io.IOException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.helix.HelixManager;
import org.apache.helix.task.Task;
import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Counter;

import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.typesafe.config.Config;

import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.runtime.TaskExecutor;
import org.apache.gobblin.runtime.TaskStateTracker;
import org.apache.gobblin.runtime.util.StateStores;


/**
Expand All @@ -54,6 +52,7 @@ public class GobblinHelixTaskFactory implements TaskFactory {
private static final String GOBBLIN_CLUSTER_NEW_HELIX_TASK_COUNTER = "gobblin.cluster.new.helix.task";

private final Optional<ContainerMetrics> containerMetrics;
private final HelixManager helixManager;

/**
* A {@link Counter} to count the number of new {@link GobblinHelixTask}s that are created.
Expand All @@ -64,10 +63,12 @@ public class GobblinHelixTaskFactory implements TaskFactory {
private final FileSystem fs;
private final Path appWorkDir;
private final StateStores stateStores;
private final TaskAttemptBuilder taskAttemptBuilder;

public GobblinHelixTaskFactory(Optional<ContainerMetrics> containerMetrics, TaskExecutor taskExecutor,
TaskStateTracker taskStateTracker, FileSystem fs, Path appWorkDir, Config config) {
TaskStateTracker taskStateTracker, FileSystem fs, Path appWorkDir, Config config, HelixManager helixManager) {
this.containerMetrics = containerMetrics;
this.helixManager = helixManager;
if (this.containerMetrics.isPresent()) {
this.newTasksCounter = Optional.of(this.containerMetrics.get().getCounter(GOBBLIN_CLUSTER_NEW_HELIX_TASK_COUNTER));
} else {
Expand All @@ -79,6 +80,15 @@ public GobblinHelixTaskFactory(Optional<ContainerMetrics> containerMetrics, Task
this.appWorkDir = appWorkDir;
this.stateStores = new StateStores(config, appWorkDir, GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME,
appWorkDir, GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME);
this.taskAttemptBuilder = createTaskAttemptBuilder();
}

private TaskAttemptBuilder createTaskAttemptBuilder() {
TaskAttemptBuilder builder = new TaskAttemptBuilder(this.taskStateTracker, this.taskExecutor);
builder.setContainerId(this.helixManager.getInstanceName());
builder.setTaskStateStore(this.stateStores.taskStateStore);

return builder;
}

@Override
Expand All @@ -87,8 +97,7 @@ public Task createNewTask(TaskCallbackContext context) {
if (this.newTasksCounter.isPresent()) {
this.newTasksCounter.get().inc();
}
return new GobblinHelixTask(context, this.containerMetrics, this.taskExecutor, this.taskStateTracker,
this.fs, this.appWorkDir, stateStores);
return new GobblinHelixTask(context, this.fs, this.appWorkDir, this.taskAttemptBuilder, this.stateStores);
} catch (IOException ioe) {
LOGGER.error("Failed to create a new GobblinHelixTask", ioe);
throw Throwables.propagate(ioe);
Expand Down
Expand Up @@ -178,7 +178,7 @@ public GobblinTaskRunner(String applicationName, String helixInstanceName, Strin
Map<String, TaskFactory> taskFactoryMap = Maps.newHashMap();
taskFactoryMap.put(GOBBLIN_TASK_FACTORY_NAME,
new GobblinHelixTaskFactory(this.containerMetrics, taskExecutor, taskStateTracker, this.fs, appWorkDir,
stateStoreJobConfig));
stateStoreJobConfig, this.helixManager));
this.taskStateModelFactory = new TaskStateModelFactory(this.helixManager, taskFactoryMap);
this.helixManager.getStateMachineEngine().registerStateModelFactory("Task", this.taskStateModelFactory);
}
Expand Down
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.cluster;

import java.util.Iterator;

import com.google.common.base.Optional;

import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.TaskExecutor;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.TaskStateTracker;
import org.apache.gobblin.source.workunit.WorkUnit;


public class TaskAttemptBuilder {
private final TaskStateTracker _taskStateTracker;
private final TaskExecutor _taskExecutor;
private String _containerId;
private StateStore<TaskState> _taskStateStore;

public TaskAttemptBuilder(TaskStateTracker taskStateTracker, TaskExecutor taskExecutor) {
_taskStateTracker = taskStateTracker;
_taskExecutor = taskExecutor;
}

public TaskAttemptBuilder setContainerId(String containerId) {
_containerId = containerId;
return this;
}

public TaskAttemptBuilder setTaskStateStore(StateStore<TaskState> taskStateStore) {
_taskStateStore = taskStateStore;
return this;
}

public GobblinMultiTaskAttempt build(Iterator<WorkUnit> workUnits, String jobId, JobState jobState,
SharedResourcesBroker<GobblinScopeTypes> jobBroker) {
GobblinMultiTaskAttempt attemptInstance =
new GobblinMultiTaskAttempt(workUnits, jobId, jobState, _taskStateTracker, _taskExecutor,
Optional.fromNullable(_containerId), Optional.fromNullable(_taskStateStore), jobBroker);

return attemptInstance;
}
}
Expand Up @@ -146,7 +146,7 @@ public void testPrepareTask() throws IOException {

GobblinHelixTaskFactory gobblinHelixTaskFactory =
new GobblinHelixTaskFactory(Optional.<ContainerMetrics>absent(), this.taskExecutor, this.taskStateTracker,
this.localFs, this.appWorkDir, ConfigFactory.empty());
this.localFs, this.appWorkDir, ConfigFactory.empty(), this.helixManager);
this.gobblinHelixTask = (GobblinHelixTask) gobblinHelixTaskFactory.createNewTask(taskCallbackContext);
}

Expand Down

0 comments on commit ba909f1

Please sign in to comment.