diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java index db9a7f90707..08b6bfbd01f 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java @@ -47,7 +47,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.beam.runners.dataflow.DataflowPipelineJob; import org.apache.beam.runners.dataflow.DataflowRunner; @@ -120,12 +119,15 @@ public Job startJob(Job job) { for (FeatureSet featureSet : job.getFeatureSets()) { featureSetProtos.add(featureSet.toProto()); } - return submitDataflowJob( - job.getId(), - featureSetProtos, - job.getSource().toProto(), - job.getStore().toProto(), - false); + String extId = + submitDataflowJob( + job.getId(), + featureSetProtos, + job.getSource().toProto(), + job.getStore().toProto(), + false); + job.setExtId(extId); + return job; } catch (InvalidProtocolBufferException e) { log.error(e.getMessage()); @@ -150,8 +152,17 @@ public Job updateJob(Job job) { for (FeatureSet featureSet : job.getFeatureSets()) { featureSetProtos.add(featureSet.toProto()); } - return submitDataflowJob( - job.getId(), featureSetProtos, job.getSource().toProto(), job.getStore().toProto(), true); + + String extId = + submitDataflowJob( + job.getId(), + featureSetProtos, + job.getSource().toProto(), + job.getStore().toProto(), + true); + + job.setExtId(extId); + return job; } catch (InvalidProtocolBufferException e) { log.error(e.getMessage()); throw new IllegalArgumentException( @@ -236,7 +247,7 @@ public JobStatus getJobStatus(Job job) { return JobStatus.UNKNOWN; } - private Job submitDataflowJob( + private String submitDataflowJob( String jobName, List featureSetProtos, SourceProto.Source source, @@ -245,17 +256,8 @@ private Job submitDataflowJob( try { ImportOptions pipelineOptions = getPipelineOptions(jobName, featureSetProtos, sink, update); DataflowPipelineJob pipelineResult = runPipeline(pipelineOptions); - List featureSets = - featureSetProtos.stream().map(FeatureSet::fromProto).collect(Collectors.toList()); String jobId = waitForJobToRun(pipelineResult); - return new Job( - jobName, - jobId, - getRunnerType(), - Source.fromProto(source), - Store.fromProto(sink), - featureSets, - JobStatus.PENDING); + return jobId; } catch (Exception e) { log.error("Error submitting job", e); throw new JobExecutionException(String.format("Error running ingestion job: %s", e), e);