Skip to content

Commit

Permalink
[GOBBLIN-336] Encapsulate the non-Helix specific
Browse files Browse the repository at this point in the history
task execution logic

Put the logic in its own class.

Also changed:
* Use a try-with statement to close the global
broker.
* Fix a Helix warning: ERROR is replaced with
FAILED.

Testing:

The integration test
org.apache.gobblin.cluster.ClusterIntegrationTest
passed.

Also inlined a method. The old code has a bug: the
globalBroker
variable will stay null.

Closes apache#2193 from HappyRay/encapsulate-non-helix-
job-launch-logic
  • Loading branch information
HappyRay authored and autumnust committed Jan 3, 2018
1 parent 889a77b commit bc973ec
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.gobblin.cluster;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -31,26 +31,16 @@
import org.slf4j.MDC;

import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import com.typesafe.config.ConfigFactory;

import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.util.StateStores;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.Id;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.SerializationUtils;


/**
Expand All @@ -67,113 +57,63 @@
* {@link org.apache.gobblin.runtime.Task}(s), it persists the {@link TaskState} of each {@link org.apache.gobblin.runtime.Task} to
* a file that will be collected by the {@link GobblinHelixJobLauncher} later upon completion of the job.
* </p>
*
* @author Yinan Li
*/
@Alpha
public class GobblinHelixTask implements Task {

private static final Logger LOGGER = LoggerFactory.getLogger(GobblinHelixTask.class);
private static final Logger _logger = LoggerFactory.getLogger(GobblinHelixTask.class);

private final TaskConfig taskConfig;
// An empty JobState instance that will be filled with values read from the serialized JobState
private final JobState jobState = new JobState();
private final String jobName;
private final String jobId;
private final String jobKey;

private final FileSystem fs;
private final StateStores stateStores;
private final TaskAttemptBuilder taskAttemptBuilder;
private String jobName;
private String jobId;
private String jobKey;
private Path workUnitFilePath;

private GobblinMultiTaskAttempt taskAttempt;
private SingleHelixTask task;

public GobblinHelixTask(TaskCallbackContext taskCallbackContext, FileSystem fs, Path appWorkDir,
TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores)
throws IOException {

this.taskConfig = taskCallbackContext.getTaskConfig();
this.stateStores = stateStores;
this.taskAttemptBuilder = taskAttemptBuilder;
this.jobName = this.taskConfig.getConfigMap().get(ConfigurationKeys.JOB_NAME_KEY);
this.jobId = this.taskConfig.getConfigMap().get(ConfigurationKeys.JOB_ID_KEY);
this.jobKey = Long.toString(Id.parse(this.jobId).getSequence());
getInfoFromTaskConfig();

Path jobStateFilePath = constructJobStateFilePath(appWorkDir);

this.fs = fs;
this.task =
new SingleHelixTask(this.jobId, workUnitFilePath, jobStateFilePath, fs, taskAttemptBuilder, stateStores);
}

Path jobStateFilePath = new Path(appWorkDir, this.jobId + "." + AbstractJobLauncher.JOB_STATE_FILE_NAME);
SerializationUtils.deserializeState(this.fs, jobStateFilePath, this.jobState);
private Path constructJobStateFilePath(Path appWorkDir) {
return new Path(appWorkDir, this.jobId + "." + AbstractJobLauncher.JOB_STATE_FILE_NAME);
}

private void getInfoFromTaskConfig() {
Map<String, String> configMap = this.taskConfig.getConfigMap();
this.jobName = configMap.get(ConfigurationKeys.JOB_NAME_KEY);
this.jobId = configMap.get(ConfigurationKeys.JOB_ID_KEY);
this.jobKey = Long.toString(Id.parse(this.jobId).getSequence());
this.workUnitFilePath = new Path(configMap.get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH));
}

@Override
public TaskResult run() {
SharedResourcesBroker<GobblinScopeTypes> globalBroker = null;
try (Closer closer = Closer.create()) {
closer.register(MDC.putCloseable(ConfigurationKeys.JOB_NAME_KEY, this.jobName));
closer.register(MDC.putCloseable(ConfigurationKeys.JOB_KEY_KEY, this.jobKey));
Path workUnitFilePath =
new Path(this.taskConfig.getConfigMap().get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH));

String fileName = workUnitFilePath.getName();
String storeName = workUnitFilePath.getParent().getName();
WorkUnit workUnit;

if (workUnitFilePath.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION)) {
workUnit = stateStores.mwuStateStore.getAll(storeName, fileName).get(0);
} else {
workUnit = stateStores.wuStateStore.getAll(storeName, fileName).get(0);
}

// The list of individual WorkUnits (flattened) to run
List<WorkUnit> workUnits = Lists.newArrayList();

if (workUnit instanceof MultiWorkUnit) {
// Flatten the MultiWorkUnit so the job configuration properties can be added to each individual WorkUnits
List<WorkUnit> flattenedWorkUnits =
JobLauncherUtils.flattenWorkUnits(((MultiWorkUnit) workUnit).getWorkUnits());
workUnits.addAll(flattenedWorkUnits);
} else {
workUnits.add(workUnit);
}

globalBroker = SharedResourcesBrokerFactory.createDefaultTopLevelBroker(
ConfigFactory.parseProperties(this.jobState.getProperties()), GobblinScopeTypes.GLOBAL.defaultScopeInstance());
SharedResourcesBroker<GobblinScopeTypes> jobBroker =
globalBroker.newSubscopedBuilder(new JobScopeInstance(this.jobState.getJobName(), this.jobState.getJobId())).build();

this.taskAttempt = this.taskAttemptBuilder.build(workUnits.iterator(), this.jobId, this.jobState, jobBroker);
this.taskAttempt.runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE);
return new TaskResult(TaskResult.Status.COMPLETED, String.format("completed tasks: %d", workUnits.size()));
int workUnitSize = this.task.run();
return new TaskResult(TaskResult.Status.COMPLETED, String.format("completed tasks: %d", workUnitSize));
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return new TaskResult(TaskResult.Status.CANCELED, "");
} catch (Throwable t) {
LOGGER.error("GobblinHelixTask failed due to " + t.getMessage(), t);
return new TaskResult(TaskResult.Status.ERROR, Throwables.getStackTraceAsString(t));
} finally {
if (globalBroker != null) {
try {
globalBroker.close();
} catch (IOException ioe) {
LOGGER.error("Could not close shared resources broker.", ioe);
}
}
_logger.error("GobblinHelixTask failed due to " + t.getMessage(), t);
return new TaskResult(TaskResult.Status.FAILED, Throwables.getStackTraceAsString(t));
}
}

@Override
public void cancel() {
if (this.taskAttempt != null) {
try {
LOGGER.info("Task cancelled: Shutdown starting for tasks with jobId: {}", this.jobId);
this.taskAttempt.shutdownTasks();
LOGGER.info("Task cancelled: Shutdown complete for tasks with jobId: {}", this.jobId);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while shutting down task with jobId: " + this.jobId, e);
}
} else {
LOGGER.error("Task cancelled but taskAttempt is null, so ignoring.");
}
this.task.cancel();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.cluster;

import java.io.IOException;
import java.util.List;
import java.util.Properties;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.util.StateStores;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.SerializationUtils;


public class SingleHelixTask {

private static final Logger _logger = LoggerFactory.getLogger(SingleHelixTask.class);

private GobblinMultiTaskAttempt _taskattempt;
private String _jobId;
private Path _workUnitFilePath;
private Path _jobStateFilePath;
private FileSystem _fs;
private TaskAttemptBuilder _taskAttemptBuilder;
private StateStores _stateStores;

SingleHelixTask(String jobId, Path workUnitFilePath, Path jobStateFilePath, FileSystem fs,
TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores) {
_jobId = jobId;
_workUnitFilePath = workUnitFilePath;
_jobStateFilePath = jobStateFilePath;
_fs = fs;
_taskAttemptBuilder = taskAttemptBuilder;
_stateStores = stateStores;
}

/**
*
* @return the number of work-units processed
* @throws IOException
* @throws InterruptedException
*/
public int run()
throws IOException, InterruptedException {
List<WorkUnit> workUnits = getWorkUnits();
int workUnitSize = workUnits.size();

JobState jobState = getJobState();
Config jobConfig = getConfigFromJobState(jobState);

try (SharedResourcesBroker<GobblinScopeTypes> globalBroker = SharedResourcesBrokerFactory
.createDefaultTopLevelBroker(jobConfig, GobblinScopeTypes.GLOBAL.defaultScopeInstance())) {
SharedResourcesBroker<GobblinScopeTypes> jobBroker = getJobBroker(jobState, globalBroker);

_taskattempt = _taskAttemptBuilder.build(workUnits.iterator(), _jobId, jobState, jobBroker);
_taskattempt.runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE);
return workUnitSize;
}
}

private SharedResourcesBroker<GobblinScopeTypes> getJobBroker(JobState jobState,
SharedResourcesBroker<GobblinScopeTypes> globalBroker) {
return globalBroker.newSubscopedBuilder(new JobScopeInstance(jobState.getJobName(), jobState.getJobId())).build();
}

private Config getConfigFromJobState(JobState jobState) {
Properties jobProperties = jobState.getProperties();
return ConfigFactory.parseProperties(jobProperties);
}

private JobState getJobState()
throws java.io.IOException {
JobState jobState = new JobState();
SerializationUtils.deserializeState(_fs, _jobStateFilePath, jobState);
return jobState;
}

private List<WorkUnit> getWorkUnits()
throws IOException {
String fileName = _workUnitFilePath.getName();
String storeName = _workUnitFilePath.getParent().getName();
WorkUnit workUnit;

if (_workUnitFilePath.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION)) {
workUnit = _stateStores.mwuStateStore.getAll(storeName, fileName).get(0);
} else {
workUnit = _stateStores.wuStateStore.getAll(storeName, fileName).get(0);
}

// The list of individual WorkUnits (flattened) to run
List<WorkUnit> workUnits = Lists.newArrayList();

if (workUnit instanceof MultiWorkUnit) {
// Flatten the MultiWorkUnit so the job configuration properties can be added to each individual WorkUnits
List<WorkUnit> flattenedWorkUnits = JobLauncherUtils.flattenWorkUnits(((MultiWorkUnit) workUnit).getWorkUnits());
workUnits.addAll(flattenedWorkUnits);
} else {
workUnits.add(workUnit);
}
return workUnits;
}

public void cancel() {
if (_taskattempt != null) {
try {
_logger.info("Task cancelled: Shutdown starting for tasks with jobId: {}", _jobId);
_taskattempt.shutdownTasks();
_logger.info("Task cancelled: Shutdown complete for tasks with jobId: {}", _jobId);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while shutting down task with jobId: " + _jobId, e);
}
} else {
_logger.error("Task cancelled but _taskattempt is null, so ignoring.");
}
}
}

0 comments on commit bc973ec

Please sign in to comment.