Skip to content
Permalink
Browse files
[GOBBLIN-988][GOBBLIN-897] Implement LocalFSJobStatusRetriever
Closes #2834 from Will-Lo/fs-jobstatus-monitor
  • Loading branch information
Will-Lo authored and htran1 committed Dec 20, 2019
1 parent 313faa5 commit dfef88e5a820f4153dfcacba4e8b5c4d463d31bf
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 33 deletions.
@@ -29,7 +29,7 @@ topologySpecFactory.localGobblinCluster.version="1"
topologySpecFactory.localGobblinCluster.uri="gobblinCluster"
topologySpecFactory.localGobblinCluster.specExecutorInstance.class="org.apache.gobblin.runtime.spec_executorInstance.LocalFsSpecExecutor"
topologySpecFactory.localGobblinCluster.specExecInstance.capabilities="source:dest"
topologySpecFactory.localGobblinCluster.gobblin.cluster.localSpecProducer.dir=${gobblin.service.work.dir}/jobs
topologySpecFactory.localGobblinCluster.localFsSpecProducer.dir=${gobblin.service.work.dir}/jobs

# Flow Catalog and Store
flowSpec.store.dir=${gobblin.service.work.dir}/flowSpecStore
@@ -40,12 +40,13 @@ gobblin.service.templateCatalogs.fullyQualifiedPath="file://"
# JobStatusMonitor
gobblin.service.jobStatusMonitor.enabled=false

# FsJobStatusRetriever
fsJobStatusRetriever.state.store.dir=${gobblin.service.work.dir}/state-store
# JobStatusRetriever
jobStatusRetriever.class="org.apache.gobblin.service.monitoring.LocalFsJobStatusRetriever"
localFsJobStatusRetriever.localFsSpecProducer.dir=${gobblin.service.work.dir}/jobs

# DagManager
gobblin.service.dagManager.enabled=true
gobblin.service.dagManager.jobStatusRetriever.class="org.apache.gobblin.service.monitoring.FsJobStatusRetriever"
gobblin.service.dagManager.jobStatusRetriever.class="org.apache.gobblin.service.monitoring.LocalFsJobStatusRetriever"
gobblin.service.dagManager.dagStateStoreClass="org.apache.gobblin.service.modules.orchestration.FSDagStateStore"
gobblin.service.dagManager.dagStateStoreDir=${gobblin.service.work.dir}/dagStateStoreDir

@@ -28,7 +28,7 @@ topologySpecFactory.localGobblinCluster.version="1"
topologySpecFactory.localGobblinCluster.uri="gobblinCluster"
topologySpecFactory.localGobblinCluster.specExecutorInstance.class="org.apache.gobblin.runtime.spec_executorInstance.LocalFsSpecExecutor"
topologySpecFactory.localGobblinCluster.specExecInstance.capabilities="source:dest"
topologySpecFactory.localGobblinCluster.gobblin.cluster.localSpecProducer.dir=${gobblin.service.work.dir}/jobs
topologySpecFactory.localGobblinCluster.localFsSpecProducer.dir=${gobblin.service.work.dir}/jobs

# Flow Catalog and Store
flowSpec.store.dir=${gobblin.service.work.dir}/flowSpecStore
@@ -39,12 +39,13 @@ gobblin.service.templateCatalogs.fullyQualifiedPath="file://"
# JobStatusMonitor
gobblin.service.jobStatusMonitor.enabled=false

# FsJobStatusRetriever
fsJobStatusRetriever.state.store.dir=${gobblin.service.work.dir}/state-store
# JobStatusRetriever
jobStatusRetriever.class="org.apache.gobblin.service.monitoring.LocalFsJobStatusRetriever"
localFsJobStatusRetriever.localFsSpecProducer.dir=${gobblin.service.work.dir}/jobs

# DagManager
gobblin.service.dagManager.enabled=true
gobblin.service.dagManager.jobStatusRetriever.class="org.apache.gobblin.service.monitoring.FsJobStatusRetriever"
gobblin.service.dagManager.jobStatusRetriever.class="org.apache.gobblin.service.monitoring.LocalFsJobStatusRetriever"
gobblin.service.dagManager.dagStateStoreClass="org.apache.gobblin.service.modules.orchestration.FSDagStateStore"
gobblin.service.dagManager.dagStateStoreDir=${gobblin.service.work.dir}/dagStateStoreDir

@@ -62,12 +63,3 @@ mysqlSpecStore.state.store.db.table="flow_spec_store"
mysqlSpecStore.state.store.db.url="jdbc:mysql://mysql.default.svc.cluster.local:3306/gaas_db"
mysqlSpecStore.state.store.db.user=${mysqlCredentials.user}
mysqlSpecStore.state.store.db.password=${mysqlCredentials.password}

# MySQL Job Status Retriever
jobStatusRetriever.class="org.apache.gobblin.service.monitoring.MysqlJobStatusRetriever"
mysqlJobStatusRetriever.state.store.db.table="gaas_job_status"

# Assuming default namespace. URL of the service takes the form of <service>.<namespace>.cluster.local
mysqlJobStatusRetriever.state.store.db.url="jdbc:mysql://mysql.default.svc.cluster.local:3306/gaas_db"
mysqlJobStatusRetriever.state.store.db.user=${mysqlCredentials.user}
mysqlJobStatusRetriever.state.store.db.password=${mysqlCredentials.password}
@@ -16,9 +16,6 @@
*/

package org.apache.gobblin.runtime.spec_executorInstance;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.typesafe.config.Config;
import java.io.File;
import java.io.FileOutputStream;
@@ -28,12 +25,12 @@
import java.util.Properties;
import java.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.util.CompletedFuture;
import org.apache.gobblin.util.ConfigUtils;


/**
@@ -42,7 +39,7 @@
@Slf4j
public class LocalFsSpecProducer implements SpecProducer<Spec> {
private String specProducerPath;
public static final String LOCAL_FS_PRODUCER_PATH_KEY = "gobblin.cluster.localSpecProducer.dir";
public static final String LOCAL_FS_PRODUCER_PATH_KEY = "localFsSpecProducer.dir";

public LocalFsSpecProducer(Config config) {
this.specProducerPath = config.getString(LOCAL_FS_PRODUCER_PATH_KEY);
@@ -72,9 +69,9 @@ public Future<?> updateSpec(Spec updatedSpec) {

private Future<?> writeSpec(Spec spec, SpecExecutor.Verb verb) {
if (spec instanceof JobSpec) {
URI specUri = spec.getUri();
// format the JobSpec to have file of <flowGroup>_<flowName>.job
String jobFileName = getJobFileName(specUri);
String flowExecutionId = ((JobSpec) spec).getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
String jobFileName = getJobFileName(spec.getUri(), flowExecutionId);
try (
FileOutputStream fStream = new FileOutputStream(this.specProducerPath + File.separatorChar + jobFileName);
) {
@@ -92,16 +89,26 @@ private Future<?> writeSpec(Spec spec, SpecExecutor.Verb verb) {

/** Delete a {@link Spec} being executed on {@link org.apache.gobblin.runtime.api.SpecExecutor}.
* @param deletedSpecURI
* @param headers*/
* @param headers
*/
@Override
public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) {
String jobFileName = getJobFileName(deletedSpecURI);
File file = new File(jobFileName);
if (file.delete()) {
log.info("Deleted spec: {}", jobFileName);
return new CompletedFuture<>(Boolean.TRUE, null);
String prefix = String.join("_", deletedSpecURI.getPath().split("/"));
// delete all of the jobs related to the spec
File dir = new File(this.specProducerPath);
File[] foundFiles = dir.listFiles((File file, String name) -> {
// only delete the jobs in progress
return name.startsWith(prefix) && name.endsWith(".job");
});

for (int i = 0; i < foundFiles.length; i++) {
Boolean didDelete = foundFiles[i].delete();
if (!didDelete) {
return new CompletedFuture<>(Boolean.TRUE, new RuntimeException(String.format("Failed to delete file with uri %s", deletedSpecURI)));
}
}
throw new RuntimeException(String.format("Failed to delete file with uri %s", deletedSpecURI));

return new CompletedFuture<>(Boolean.TRUE, null);
}

/** List all {@link Spec} being executed on {@link org.apache.gobblin.runtime.api.SpecExecutor}. */
@@ -110,9 +117,9 @@ public Future<? extends List<Spec>> listSpecs() {
throw new UnsupportedOperationException();
}

private String getJobFileName(URI specUri) {
public static String getJobFileName(URI specUri, String flowExecutionId) {
String[] uriTokens = specUri.getPath().split("/");
return String.join("_", uriTokens) + ".job";
return String.join("_", uriTokens) + "_" + flowExecutionId + ".job";
}

}
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.service.monitoring;

import com.google.common.base.Preconditions;

import com.google.common.collect.Iterators;
import com.typesafe.config.Config;
import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.runtime.spec_executorInstance.LocalFsSpecProducer;
import org.apache.gobblin.service.ExecutionStatus;

import lombok.extern.slf4j.Slf4j;

/**
* A job status monitor for jobs completed by a Gobblin Standalone instance running on the same machine. Mainly used for sandboxing/testing
* Considers a job done when Gobblin standalone appends ".done" to the job. Otherwise it will assume the job is in progress
*/
@Slf4j
public class LocalFsJobStatusRetriever extends JobStatusRetriever {

public static final String CONF_PREFIX = "localFsJobStatusRetriever.";
private String JOB_DONE_SUFFIX = ".done";
private String specProducerPath;

// Do not use a state store for this implementation, just look at the job folder that @LocalFsSpecProducer writes to
public LocalFsJobStatusRetriever(Config config) {
this.specProducerPath = config.getString(CONF_PREFIX + LocalFsSpecProducer.LOCAL_FS_PRODUCER_PATH_KEY);
}

private Boolean doesJobExist(String flowName, String flowGroup, long flowExecutionId, String suffix) {
// Local FS has no monitor to update job state yet, for now check if standalone is completed with job, and mark as done
// Otherwise the job is pending
try {
String fileName = LocalFsSpecProducer.getJobFileName(new URI(File.separatorChar + flowGroup + File.separatorChar + flowName), String.valueOf(flowExecutionId)) + suffix;
return new File(this.specProducerPath + File.separatorChar + fileName).exists();
} catch (URISyntaxException e) {
log.error("URISyntaxException occurred when retrieving job status for flow: {},{}", flowGroup, flowName, e);
}
return false;
}

@Override
public Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup, long flowExecutionId) {
Preconditions.checkArgument(flowName != null, "FlowName cannot be null");
Preconditions.checkArgument(flowGroup != null, "FlowGroup cannot be null");

// For the FS use case, JobExecutionID == FlowExecutionID
return getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId, flowName, flowGroup);
}

@Override
public Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup, long flowExecutionId,
String jobName, String jobGroup) {
Preconditions.checkArgument(flowName != null, "flowName cannot be null");
Preconditions.checkArgument(flowGroup != null, "flowGroup cannot be null");
Preconditions.checkArgument(jobName != null, "jobName cannot be null");
Preconditions.checkArgument(jobGroup != null, "jobGroup cannot be null");
List<JobStatus> jobStatuses = new ArrayList<>();
JobStatus jobStatus;

if (this.doesJobExist(flowName, flowGroup, flowExecutionId, JOB_DONE_SUFFIX)) {
jobStatus = JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
jobName(jobName).jobGroup(jobGroup).jobExecutionId(flowExecutionId).eventName(ExecutionStatus.COMPLETE.name()).build();
} else if (this.doesJobExist(flowName, flowGroup, flowExecutionId, "")) {
jobStatus = JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
jobName(jobName).jobGroup(jobGroup).jobExecutionId(flowExecutionId).eventName(ExecutionStatus.PENDING.name()).build();
} else {
return Iterators.emptyIterator();
}

jobStatuses.add(jobStatus);
return jobStatuses.iterator();
}

/**
* @param flowName
* @param flowGroup
* @return the last <code>count</code> flow execution ids with the given flowName and flowGroup. -1 will be returned if no such execution found.
*/
@Override
public List<Long> getLatestExecutionIdsForFlow(String flowName, String flowGroup, int count) {
Preconditions.checkArgument(flowName != null, "flowName cannot be null");
Preconditions.checkArgument(flowGroup != null, "flowGroup cannot be null");
Preconditions.checkArgument(count > 0, "Number of execution ids must be at least 1.");

//TODO: implement this

return null;
}

public StateStore<State> getStateStore() {
// this jobstatus retriever does not have a state store
// only used in tests so this is okay
return null;
}
}

0 comments on commit dfef88e

Please sign in to comment.