Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@
<artifactId>spring-kafka</artifactId>
</dependency>

<!--compileOnly 'org.projectlombok:lombok:1.18.2'-->
<!--compileOnly 'org.projectlombok:lombok:1.18.12'-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import feast.core.util.KafkaSerialization;
import feast.proto.core.FeatureSetProto;
import feast.proto.core.IngestionJobProto;
import feast.proto.core.SourceProto;
import feast.proto.core.SourceProto.KafkaSourceConfig;
import feast.proto.core.SourceProto.SourceType;
import java.util.HashMap;
Expand Down Expand Up @@ -145,7 +146,12 @@ public Source getDefaultSource(FeastProperties feastProperties) {
.setBootstrapServers(bootstrapServers)
.setTopic(topicName)
.build();
return new Source(featureStreamType, sourceConfig, true);
SourceProto.Source source =
SourceProto.Source.newBuilder()
.setType(featureStreamType)
.setKafkaSourceConfig(sourceConfig)
.build();
return Source.fromProto(source, true);
default:
throw new RuntimeException("Unsupported source stream, only [KAFKA] is supported");
}
Expand Down
18 changes: 14 additions & 4 deletions core/src/main/java/feast/core/dao/JobRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,28 @@

import feast.core.model.FeatureSetJobStatus;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.proto.core.SourceProto;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

/** JPA repository supplying Job objects keyed by ID. */
@Repository
public interface JobRepository extends JpaRepository<Job, String> {
List<Job> findBySourceIdAndStoreNameOrderByLastUpdatedDesc(String sourceId, String storeName);
Optional<Job>
findFirstBySourceTypeAndSourceConfigAndStoreNameAndStatusNotInOrderByLastUpdatedDesc(
SourceProto.SourceType sourceType,
String sourceConfig,
String storeName,
Collection<JobStatus> statuses);

// find jobs by feast store name
List<Job> findByStoreName(String storeName);
List<Job> findByStatus(JobStatus status);

// find jobs by featureset
List<Job> findByFeatureSetJobStatusesIn(List<FeatureSetJobStatus> featureSetsJobStatuses);

// find jobs by feast store name
List<Job> findByStoreName(String storeName);
}
5 changes: 4 additions & 1 deletion core/src/main/java/feast/core/dao/SourceRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
package feast.core.dao;

import feast.core.model.Source;
import feast.proto.core.SourceProto.SourceType;
import org.springframework.data.jpa.repository.JpaRepository;

/** JPA repository supplying Source objects keyed by id. */
public interface SourceRepository extends JpaRepository<Source, String> {}
public interface SourceRepository extends JpaRepository<Source, String> {
Source findFirstByTypeAndConfigOrderByIdAsc(SourceType type, String config);
}
10 changes: 6 additions & 4 deletions core/src/main/java/feast/core/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public interface JobManager {
Runner getRunnerType();

/**
* Start an import job.
* Start an import job. Start should change the status of the Job from PENDING to RUNNING.
*
* @param job job to start
* @return Job
Expand All @@ -45,11 +45,13 @@ public interface JobManager {
Job updateJob(Job job);

/**
* Abort a job given runner-specific job ID.
* Abort a job given runner-specific job ID. Abort should change the status of the Job from
* RUNNING to ABORTING.
*
* @param extId runner specific job id.
* @param job to abort.
* @return The aborted Job
*/
void abortJob(String extId);
Job abortJob(Job job);

/**
* Restart an job. If job is an terminated state, will simply start the job. Might cause data to
Expand Down
90 changes: 63 additions & 27 deletions core/src/main/java/feast/core/job/JobUpdateTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import feast.core.model.*;
import feast.proto.core.FeatureSetProto;
import java.time.Instant;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand All @@ -36,21 +38,30 @@
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;

/**
* JobUpdateTask is a callable that starts or updates a job given a set of featureSetSpecs, as well
* as their source and sink.
* their source and sink to transition to targetStatus.
*
* <p>When complete, the JobUpdateTask returns the updated Job object to be pushed to the db.
*/
@Slf4j
@Getter
public class JobUpdateTask implements Callable<Job> {

/**
* JobTargetStatus enum defines the possible target statuses that JobUpdateTask can transition a
* Job to.
*/
public enum JobTargetStatus {
RUNNING,
ABORTED
}

private final List<FeatureSet> featureSets;
private final Source source;
private final Store store;
private final JobTargetStatus targetStatus;
private final Optional<Job> currentJob;
private final JobManager jobManager;
private final long jobUpdateTimeoutSeconds;
Expand All @@ -62,38 +73,44 @@ public JobUpdateTask(
Store store,
Optional<Job> currentJob,
JobManager jobManager,
long jobUpdateTimeoutSeconds) {

long jobUpdateTimeoutSeconds,
JobTargetStatus targetStatus) {
this.featureSets = featureSets;
this.source = source;
this.store = store;
this.currentJob = currentJob;
this.jobManager = jobManager;
this.jobUpdateTimeoutSeconds = jobUpdateTimeoutSeconds;
this.runnerName = jobManager.getRunnerType().toString();
this.targetStatus = targetStatus;
}

@Override
public Job call() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Job> submittedJob;

if (currentJob.isEmpty()) {
if (this.targetStatus.equals(JobTargetStatus.RUNNING) && currentJob.isEmpty()) {
submittedJob = executorService.submit(this::createJob);
} else if (this.targetStatus.equals(JobTargetStatus.RUNNING)
&& currentJob.isPresent()
&& requiresUpdate(currentJob.get())) {
submittedJob = executorService.submit(() -> updateJob(currentJob.get()));
} else if (this.targetStatus.equals(JobTargetStatus.ABORTED)
&& currentJob.isPresent()
&& currentJob.get().getStatus() == JobStatus.RUNNING) {
submittedJob = executorService.submit(() -> stopJob(currentJob.get()));
} else if (this.targetStatus.equals(JobTargetStatus.ABORTED) && currentJob.isEmpty()) {
throw new IllegalArgumentException("Cannot abort an nonexistent ingestion job.");
} else {
Job job = currentJob.get();

if (requiresUpdate(job)) {
submittedJob = executorService.submit(() -> updateJob(job));
} else {
return updateStatus(job);
}
return this.updateStatus(currentJob.get());
}

try {
return submittedJob.get(getJobUpdateTimeoutSeconds(), TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.warn("Unable to start job for source {} and sink {}: {}", source, store, e.getMessage());
log.warn("Unable to start job for source {} and sink {}:", source, store);
e.printStackTrace();
return null;
} finally {
executorService.shutdownNow();
Expand All @@ -111,24 +128,38 @@ boolean requiresUpdate(Job job) {
}

private Job createJob() {
String jobId = createJobId(source.getId(), store.getName());
String jobId = createJobId(source, store.getName());
return startJob(jobId);
}

/** Start or update the job to ingest data to the sink. */
private Job startJob(String jobId) {
Job job = new Job();
job.setId(jobId);
job.setRunner(jobManager.getRunnerType());
job.setSource(source);
job.setStore(store);
job.setStatus(JobStatus.PENDING);
Job job =
Job.builder()
.setId(jobId)
.setRunner(jobManager.getRunnerType())
.setSource(source)
.setStore(store)
.setStatus(JobStatus.PENDING)
.setFeatureSetJobStatuses(new HashSet<>())
.build();

updateFeatureSets(job);

try {
logAudit(Action.SUBMIT, job, "Building graph and submitting to %s", runnerName);

System.out.println(
job.equals(
Job.builder()
.setId("job")
.setExtId("")
.setRunner(Runner.DATAFLOW)
.setSource(source)
.setStore(store)
.setStatus(JobStatus.PENDING)
.build()));

job = jobManager.startJob(job);
var extId = job.getExtId();
if (extId.isEmpty()) {
Expand Down Expand Up @@ -182,6 +213,12 @@ private Job updateJob(Job job) {
return jobManager.updateJob(job);
}

/** Stop the given job */
private Job stopJob(Job job) {
logAudit(Action.ABORT, job, "Aborting job %s for runner %s", job.getId(), runnerName);
return jobManager.abortJob(job);
}

private Job updateStatus(Job job) {
JobStatus currentStatus = job.getStatus();
JobStatus newStatus = jobManager.getJobStatus(job);
Expand All @@ -195,14 +232,13 @@ private Job updateStatus(Job job) {
return job;
}

String createJobId(String sourceId, String storeName) {
String createJobId(Source source, String storeName) {
String dateSuffix = String.valueOf(Instant.now().toEpochMilli());
String[] sourceParts = sourceId.split("/", 2);
String sourceType = sourceParts[0].toLowerCase();
String sourceHash =
Hashing.murmur3_128().hashUnencodedChars(sourceParts[1]).toString().substring(0, 10);
String jobId = String.format("%s-%s-to-%s-%s", sourceType, sourceHash, storeName, dateSuffix);
return jobId.replaceAll("_", "-");
String jobId =
String.format(
"%s-%d-to-%s-%s",
source.getTypeString(), Objects.hashCode(source.getConfig()), storeName, dateSuffix);
return jobId.replaceAll("_store", "-");
}

private void logAudit(Action action, Job job, String detail, Object... args) {
Expand Down
18 changes: 12 additions & 6 deletions core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public Job startJob(Job job) {
submitDataflowJob(
job.getId(), job.getSource().toProto(), job.getStore().toProto(), false);
job.setExtId(extId);
job.setStatus(JobStatus.RUNNING);
return job;

} catch (InvalidProtocolBufferException e) {
Expand All @@ -136,25 +137,27 @@ public Job startJob(Job job) {
*/
@Override
public Job updateJob(Job job) {
abortJob(job.getExtId());
abortJob(job);
return job;
}

/**
* Abort an existing Dataflow job. Streaming Dataflow jobs are always drained, not cancelled.
*
* @param dataflowJobId Dataflow-specific job id (not the job name)
* @param job to abort.
* @return The aborted Job.
*/
@Override
public void abortJob(String dataflowJobId) {
public Job abortJob(Job job) {
String dataflowJobId = job.getExtId();
try {
com.google.api.services.dataflow.model.Job job =
com.google.api.services.dataflow.model.Job dataflowJob =
dataflow.projects().locations().jobs().get(projectId, location, dataflowJobId).execute();
com.google.api.services.dataflow.model.Job content =
new com.google.api.services.dataflow.model.Job();
if (job.getType().equals(DataflowJobType.JOB_TYPE_BATCH.toString())) {
if (dataflowJob.getType().equals(DataflowJobType.JOB_TYPE_BATCH.toString())) {
content.setRequestedState(DataflowJobState.JOB_STATE_CANCELLED.toString());
} else if (job.getType().equals(DataflowJobType.JOB_TYPE_STREAMING.toString())) {
} else if (dataflowJob.getType().equals(DataflowJobType.JOB_TYPE_STREAMING.toString())) {
content.setRequestedState(DataflowJobState.JOB_STATE_DRAINING.toString());
}
dataflow
Expand All @@ -168,6 +171,9 @@ public void abortJob(String dataflowJobId) {
throw new RuntimeException(
Strings.lenientFormat("Unable to drain job with id: %s", dataflowJobId), e);
}

job.setStatus(JobStatus.ABORTING);
return job;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,30 +121,32 @@ private ImportOptions getPipelineOptions(
*/
@Override
public Job updateJob(Job job) {
String jobId = job.getExtId();
abortJob(jobId);
try {
return startJob(job);
return startJob(abortJob(job));
} catch (JobExecutionException e) {
throw new JobExecutionException(String.format("Error running ingestion job: %s", e), e);
}
}

/**
* Abort the direct runner job with the given id, then remove it from the direct jobs registry.
* Abort the direct runner job,removing it from the direct jobs registry.
*
* @param extId runner specific job id.
* @param job to abort.
* @return The aborted Job
*/
@Override
public void abortJob(String extId) {
DirectJob job = jobs.get(extId);
public Job abortJob(Job job) {
DirectJob directJob = jobs.get(job.getExtId());
try {
job.abort();
directJob.abort();
} catch (IOException e) {
throw new RuntimeException(
Strings.lenientFormat("Unable to abort DirectRunner job %s", extId), e);
Strings.lenientFormat("Unable to abort DirectRunner job %s", job.getExtId(), e));
}
jobs.remove(extId);
jobs.remove(job.getExtId());

job.setStatus(JobStatus.ABORTING);
return job;
}

public PipelineResult runPipeline(ImportOptions pipelineOptions) throws IOException {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/feast/core/model/FeatureSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.springframework.data.util.Pair;
import org.apache.commons.lang3.tuple.Pair;
import org.tensorflow.metadata.v0.*;

@Getter
Expand Down
Loading