Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GOBBLIN-1052: Create a spec consumer path if it does not exist in FS … #2892

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -17,15 +17,14 @@
package org.apache.gobblin.cluster;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileSystem;

import com.google.common.base.Optional;
import com.google.common.eventbus.EventBus;
Expand All @@ -38,7 +37,6 @@
import org.apache.gobblin.runtime.api.MutableJobCatalog;
import org.apache.gobblin.runtime.api.SpecConsumer;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;

Expand All @@ -55,88 +53,73 @@ public class FsJobConfigurationManager extends JobConfigurationManager {
private static final long DEFAULT_JOB_SPEC_REFRESH_INTERVAL = 60;

private final long refreshIntervalInSeconds;

private final ScheduledExecutorService fetchJobSpecExecutor;
private final Optional<MutableJobCatalog> jobCatalogOptional;
private final SpecConsumer specConsumer;

protected final SpecConsumer _specConsumer;

private final ClassAliasResolver<SpecConsumer> aliasResolver;

private final Optional<MutableJobCatalog> _jobCatalogOptional;

public FsJobConfigurationManager(EventBus eventBus, Config config) {
this(eventBus, config, null);
public FsJobConfigurationManager(EventBus eventBus, Config config, FileSystem fs) {
this(eventBus, config, null, fs);
}

public FsJobConfigurationManager(EventBus eventBus, Config config, MutableJobCatalog jobCatalog) {
public FsJobConfigurationManager(EventBus eventBus, Config config, MutableJobCatalog jobCatalog, FileSystem fs) {
super(eventBus, config);
this._jobCatalogOptional = jobCatalog != null ? Optional.of(jobCatalog) : Optional.absent();
this.jobCatalogOptional = jobCatalog != null ? Optional.of(jobCatalog) : Optional.absent();
this.refreshIntervalInSeconds = ConfigUtils.getLong(config, GobblinClusterConfigurationKeys.JOB_SPEC_REFRESH_INTERVAL,
DEFAULT_JOB_SPEC_REFRESH_INTERVAL);

this.fetchJobSpecExecutor = Executors.newSingleThreadScheduledExecutor(
ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("FetchJobSpecExecutor")));

this.aliasResolver = new ClassAliasResolver<>(SpecConsumer.class);
try {
String specConsumerClassName = ConfigUtils.getString(config, GobblinClusterConfigurationKeys.SPEC_CONSUMER_CLASS_KEY,
GobblinClusterConfigurationKeys.DEFAULT_SPEC_CONSUMER_CLASS);
log.info("Using SpecConsumer ClassNameclass name/alias " + specConsumerClassName);
this._specConsumer = (SpecConsumer) ConstructorUtils
.invokeConstructor(Class.forName(this.aliasResolver.resolve(specConsumerClassName)), config);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException
| ClassNotFoundException e) {
throw new RuntimeException(e);
}
this.specConsumer = new FsSpecConsumer(fs, config);
}

protected void startUp() throws Exception{
protected void startUp() throws Exception {
super.startUp();
// Schedule the job config fetch task
this.fetchJobSpecExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
fetchJobSpecs();
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to fetch job specs", e);
throw new RuntimeException("Failed to fetch specs", e);
} catch (Exception e) {
//Log error and swallow exception to allow executor service to continue scheduling the thread
log.error("Failed to fetch job specs due to: ", e);
}
}
}, 0, this.refreshIntervalInSeconds, TimeUnit.SECONDS);
}

void fetchJobSpecs() throws ExecutionException, InterruptedException {
List<Pair<SpecExecutor.Verb, JobSpec>> jobSpecs =
(List<Pair<SpecExecutor.Verb, JobSpec>>) this._specConsumer.changedSpecs().get();
(List<Pair<SpecExecutor.Verb, JobSpec>>) this.specConsumer.changedSpecs().get();

log.info("Fetched {} job specs", jobSpecs.size());
for (Pair<SpecExecutor.Verb, JobSpec> entry : jobSpecs) {
JobSpec jobSpec = entry.getValue();
SpecExecutor.Verb verb = entry.getKey();
if (verb.equals(SpecExecutor.Verb.ADD)) {
// Handle addition
if (this._jobCatalogOptional.isPresent()) {
this._jobCatalogOptional.get().put(jobSpec);
if (this.jobCatalogOptional.isPresent()) {
this.jobCatalogOptional.get().put(jobSpec);
}
postNewJobConfigArrival(jobSpec.getUri().toString(), jobSpec.getConfigAsProperties());
} else if (verb.equals(SpecExecutor.Verb.UPDATE)) {
//Handle update.
if (this._jobCatalogOptional.isPresent()) {
this._jobCatalogOptional.get().put(jobSpec);
if (this.jobCatalogOptional.isPresent()) {
this.jobCatalogOptional.get().put(jobSpec);
}
postUpdateJobConfigArrival(jobSpec.getUri().toString(), jobSpec.getConfigAsProperties());
} else if (verb.equals(SpecExecutor.Verb.DELETE)) {
// Handle delete
if (this._jobCatalogOptional.isPresent()) {
this._jobCatalogOptional.get().remove(jobSpec.getUri());
if (this.jobCatalogOptional.isPresent()) {
this.jobCatalogOptional.get().remove(jobSpec.getUri());
}
postDeleteJobConfigArrival(jobSpec.getUri().toString(), jobSpec.getConfigAsProperties());
}

try {
//Acknowledge the successful consumption of the JobSpec back to the SpecConsumer, so that the
//SpecConsumer can delete the JobSpec.
this._specConsumer.commit(jobSpec);
this.specConsumer.commit(jobSpec);
} catch (IOException e) {
log.error("Error when committing to FsSpecConsumer: ", e);
}
Expand Down
Expand Up @@ -157,7 +157,7 @@ public class GobblinClusterConfigurationKeys {
public static final long DEFAULT_HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS = 300;

public static final String HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "workflowListingTimeoutSeconds";
public static final long DEFAULT_HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS = 300;
public static final long DEFAULT_HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS = 60;

public static final String CLEAN_ALL_DIST_JOBS = GOBBLIN_CLUSTER_PREFIX + "bootup.clean.dist.jobs";
public static final boolean DEFAULT_CLEAN_ALL_DIST_JOBS = false;
Expand Down
Expand Up @@ -18,7 +18,6 @@
package org.apache.gobblin.cluster;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -396,16 +395,15 @@ private JobConfigurationManager buildJobConfigurationManager(Config config) {

private JobConfigurationManager create(Config config) {
try {
List<Object> argumentList = (this.jobCatalog != null)? ImmutableList.of(this.eventBus, config, this.jobCatalog) :
ImmutableList.of(this.eventBus, config);
List<Object> argumentList = (this.jobCatalog != null)? ImmutableList.of(this.eventBus, config, this.jobCatalog, this.fs) :
ImmutableList.of(this.eventBus, config, this.fs);
if (config.hasPath(GobblinClusterConfigurationKeys.JOB_CONFIGURATION_MANAGER_KEY)) {
return (JobConfigurationManager) GobblinConstructorUtils.invokeFirstConstructor(Class.forName(
config.getString(GobblinClusterConfigurationKeys.JOB_CONFIGURATION_MANAGER_KEY)), argumentList);
return (JobConfigurationManager) GobblinConstructorUtils.invokeLongestConstructor(Class.forName(
config.getString(GobblinClusterConfigurationKeys.JOB_CONFIGURATION_MANAGER_KEY)), argumentList.toArray(new Object[argumentList.size()]));
} else {
return new JobConfigurationManager(this.eventBus, config);
}
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException |
ClassNotFoundException e) {
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
}
Expand Down
Expand Up @@ -377,7 +377,7 @@ private void cancelJobIfRequired(DeleteJobConfigArrivalEvent deleteJobArrival) t
Collections.singletonList(deleteJobArrival.getJobName()));
Retryer<Map<String, String>> retryer = RetryerBuilder.<Map<String, String>>newBuilder()
.retryIfException()
.withStopStrategy(StopStrategies.stopAfterAttempt(1))
.withStopStrategy(StopStrategies.stopAfterAttempt(5))
.withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(this.helixWorkflowListingTimeoutMillis, TimeUnit.MILLISECONDS)).build();
Map<String, String> jobNameToWorkflowIdMap;
try {
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.TargetState;
import org.apache.helix.task.TaskConfig;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskState;
Expand Down Expand Up @@ -277,7 +278,7 @@ private static void deleteStoppedHelixJob(HelixManager helixManager, String work
}

/**
* Returns the Helix Workflow Ids given {@link Iterable} of Gobblin job names. The method returns a
* Returns the currently running Helix Workflow Ids given an {@link Iterable} of Gobblin job names. The method returns a
* {@link java.util.Map} from Gobblin job name to the corresponding Helix Workflow Id. This method iterates
* over all Helix workflows, and obtains the jobs of each workflow from its jobDag.
*
Expand All @@ -293,6 +294,10 @@ public static Map<String, String> getWorkflowIdsFromJobNames(HelixManager helixM
Map<String, WorkflowConfig> workflowConfigMap = taskDriver.getWorkflows();
for (String workflow : workflowConfigMap.keySet()) {
WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(workflow);
//Filter out any stale Helix workflows which are not running.
if (workflowConfig.getTargetState() != TargetState.START) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why filter out stopped job if the method call is to get a list of workFlowId from Gobblin job name ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this comment. The purpose of this method is two fold: 1. The caller may want to cancel a Helix workflow given a job name and 2. The caller may want to get the workflow id and use that to query other information such as the workunits processed by the job. For both 1 and 2, the caller is only interested in the active workflows. In fact, the stale workflows are auto-pruged by Helix based on an expiry interval.

I have modified the javadoc for this method to be more explicit about which workflows are returned.

continue;
}
Set<String> helixJobs = workflowConfig.getJobDag().getAllNodes();
for (String helixJob : helixJobs) {
Iterator<TaskConfig> taskConfigIterator = taskDriver.getJobConfig(helixJob).getTaskConfigMap().values().iterator();
Expand Down
Expand Up @@ -32,6 +32,7 @@

import com.google.common.eventbus.EventBus;
import com.google.common.io.Files;
import com.google.common.util.concurrent.Service;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
Expand All @@ -50,6 +51,8 @@
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
import org.apache.gobblin.util.filters.HiddenFilter;


@Slf4j
public class FsJobConfigurationManagerTest {
Expand Down Expand Up @@ -96,16 +99,15 @@ public void setUp() throws IOException {

Config config = ConfigFactory.empty()
.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, ConfigValueFactory.fromAnyRef(jobConfDir))
.withValue(GobblinClusterConfigurationKeys.SPEC_CONSUMER_CLASS_KEY, ConfigValueFactory.fromAnyRef(FsSpecConsumer.class.getName()))
.withValue(FsSpecConsumer.SPEC_PATH_KEY, ConfigValueFactory.fromAnyRef(fsSpecConsumerPathString))
.withValue(GobblinClusterConfigurationKeys.JOB_SPEC_REFRESH_INTERVAL, ConfigValueFactory.fromAnyRef(1));

this._jobCatalog = new NonObservingFSJobCatalog(config);
((NonObservingFSJobCatalog) this._jobCatalog).startAsync().awaitRunning();

jobConfigurationManager = new FsJobConfigurationManager(eventBus, config, this._jobCatalog);
jobConfigurationManager = new FsJobConfigurationManager(eventBus, config, this._jobCatalog, this.fs);

_specProducer = new FsSpecProducer(config);
_specProducer = new FsSpecProducer(this.fs, config);
}

private void addJobSpec(String jobSpecName, String version, String verb)
Expand Down Expand Up @@ -149,7 +151,7 @@ public void testFetchJobSpecs() throws ExecutionException, InterruptedException,

//Ensure the JobSpec is deleted from the FsSpecConsumer path.
Path fsSpecConsumerPath = new Path(fsSpecConsumerPathString);
Assert.assertEquals(this.fs.listStatus(fsSpecConsumerPath).length, 0);
Assert.assertEquals(this.fs.listStatus(fsSpecConsumerPath, new HiddenFilter()).length, 0);

//Ensure NewJobConfigArrivalEvent is posted to EventBus
Assert.assertEquals(newJobConfigArrivalEventCount, 1);
Expand All @@ -166,7 +168,7 @@ public void testFetchJobSpecs() throws ExecutionException, InterruptedException,
Assert.assertTrue(jobSpec.getVersion().equals(version2));

//Ensure the updated JobSpec is deleted from the FsSpecConsumer path.
Assert.assertEquals(this.fs.listStatus(fsSpecConsumerPath).length, 0);
Assert.assertEquals(this.fs.listStatus(fsSpecConsumerPath, new HiddenFilter()).length, 0);

//Ensure UpdateJobConfigArrivalEvent is posted to EventBus
Assert.assertEquals(newJobConfigArrivalEventCount, 1);
Expand All @@ -179,7 +181,7 @@ public void testFetchJobSpecs() throws ExecutionException, InterruptedException,
this.jobConfigurationManager.fetchJobSpecs();

//Ensure the JobSpec is deleted from the FsSpecConsumer path.
Assert.assertEquals(this.fs.listStatus(fsSpecConsumerPath).length, 0);
Assert.assertEquals(this.fs.listStatus(fsSpecConsumerPath, new HiddenFilter()).length, 0);
this._jobCatalog.getJobSpec(new URI(jobSpecUriString));

//Ensure DeleteJobConfigArrivalEvent is posted to EventBus
Expand All @@ -198,13 +200,14 @@ public void testException()

//Add wait to ensure that fetchJobSpecExecutor thread is scheduled at least once.
Thread.sleep(2000);
Mockito.verify(jobConfigurationManager, Mockito.times(1)).fetchJobSpecs();
int numInvocations = Mockito.mockingDetails(jobConfigurationManager).getInvocations().size();
Mockito.verify(jobConfigurationManager, Mockito.atLeast(1)).fetchJobSpecs();

Thread.sleep(2000);
//Verify that there are no new invocations of fetchJobSpecs()
Mockito.verify(jobConfigurationManager, Mockito.times(1)).fetchJobSpecs();
//Ensure that the JobConfigurationManager Service is not running.
Assert.assertFalse(jobConfigurationManager.isRunning());
//Verify that there new invocations of fetchJobSpecs()
Mockito.verify(jobConfigurationManager, Mockito.atLeast(numInvocations + 1)).fetchJobSpecs();
//Ensure that the JobConfigurationManager Service is running.
Assert.assertTrue(!jobConfigurationManager.state().equals(Service.State.FAILED) && !jobConfigurationManager.state().equals(Service.State.TERMINATED));
}

@AfterClass
Expand Down
Expand Up @@ -24,6 +24,9 @@
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;

import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import com.google.common.io.Resources;
Expand Down Expand Up @@ -52,7 +55,8 @@ public class IntegrationJobRestartViaSpecSuite extends IntegrationJobCancelSuite

public IntegrationJobRestartViaSpecSuite() throws IOException {
super();
this._specProducer = new FsSpecProducer(ConfigFactory.empty().withValue(FsSpecConsumer.SPEC_PATH_KEY, ConfigValueFactory.fromAnyRef(FS_SPEC_CONSUMER_DIR)));
FileSystem fs = FileSystem.getLocal(new Configuration());
this._specProducer = new FsSpecProducer(fs, ConfigFactory.empty().withValue(FsSpecConsumer.SPEC_PATH_KEY, ConfigValueFactory.fromAnyRef(FS_SPEC_CONSUMER_DIR)));
}

private Config getJobConfig() throws IOException {
Expand Down Expand Up @@ -80,7 +84,6 @@ public Config getManagerConfig() {
Config managerConfig = super.getManagerConfig();
managerConfig = managerConfig.withValue(GobblinClusterConfigurationKeys.JOB_CONFIGURATION_MANAGER_KEY,
ConfigValueFactory.fromAnyRef(FsJobConfigurationManager.class.getName()))
.withValue(GobblinClusterConfigurationKeys.SPEC_CONSUMER_CLASS_KEY, ConfigValueFactory.fromAnyRef(FsSpecConsumer.class.getName()))
.withValue(GobblinClusterConfigurationKeys.JOB_SPEC_REFRESH_INTERVAL, ConfigValueFactory.fromAnyRef(1L))
.withValue(FsSpecConsumer.SPEC_PATH_KEY, ConfigValueFactory.fromAnyRef(FS_SPEC_CONSUMER_DIR));
return managerConfig;
Expand Down
Expand Up @@ -40,10 +40,13 @@

import com.typesafe.config.Config;

import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
import org.apache.gobblin.util.CompletedFuture;
import org.apache.gobblin.util.filters.HiddenFilter;


@Slf4j
public class FsSpecConsumer implements SpecConsumer<Spec> {
Expand All @@ -56,9 +59,16 @@ public class FsSpecConsumer implements SpecConsumer<Spec> {


public FsSpecConsumer(Config config) {
this(null, config);
}

public FsSpecConsumer(@Nullable FileSystem fs, Config config) {
this.specDirPath = new Path(config.getString(SPEC_PATH_KEY));
try {
this.fs = this.specDirPath.getFileSystem(new Configuration());
this.fs = (fs == null) ? FileSystem.get(new Configuration()) : fs;
if (!this.fs.exists(specDirPath)) {
this.fs.mkdirs(specDirPath);
}
} catch (IOException e) {
throw new RuntimeException("Unable to detect spec directory file system: " + e, e);
}
Expand All @@ -72,11 +82,12 @@ public Future<? extends List<Pair<SpecExecutor.Verb, Spec>>> changedSpecs() {
List<Pair<SpecExecutor.Verb, Spec>> specList = new ArrayList<>();
FileStatus[] fileStatuses;
try {
fileStatuses = this.fs.listStatus(this.specDirPath);
fileStatuses = this.fs.listStatus(this.specDirPath, new HiddenFilter());
} catch (IOException e) {
log.error("Error when listing files at path: {}", this.specDirPath.toString(), e);
return null;
}
log.info("Found {} files at path {}", fileStatuses.length, this.specDirPath.toString());

//Sort the {@link JobSpec}s in increasing order of their modification times.
//This is done so that the {JobSpec}s can be handled in FIFO order by the
Expand Down Expand Up @@ -118,6 +129,7 @@ public Future<? extends List<Pair<SpecExecutor.Verb, Spec>>> changedSpecs() {
SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(verbName);

JobSpec jobSpec = jobSpecBuilder.build();
log.debug("Successfully built jobspec: {}", jobSpec.getUri().toString());
specList.add(new ImmutablePair<SpecExecutor.Verb, Spec>(verb, jobSpec));
this.specToPathMap.put(jobSpec.getUri(), fileStatus.getPath());
}
Expand All @@ -130,7 +142,10 @@ public Future<? extends List<Pair<SpecExecutor.Verb, Spec>>> changedSpecs() {
public void commit(Spec spec) throws IOException {
Path path = this.specToPathMap.get(spec.getUri());
if (path != null) {
log.debug("Calling delete on path: {}", path.toString());
this.fs.delete(path, false);
} else {
log.error("No path found for job: {}", spec.getUri().toString());
}
}
}