diff --git a/core/src/main/java/feast/core/job/JobUpdateTask.java b/core/src/main/java/feast/core/job/JobUpdateTask.java index f3afe84df77..04aab0cff68 100644 --- a/core/src/main/java/feast/core/job/JobUpdateTask.java +++ b/core/src/main/java/feast/core/job/JobUpdateTask.java @@ -144,7 +144,7 @@ private Job startJob( new Job( jobId, "", - jobManager.getRunnerType().name(), + jobManager.getRunnerType(), Source.fromProto(source), Store.fromProto(sinkSpec), featureSets, diff --git a/core/src/main/java/feast/core/job/Runner.java b/core/src/main/java/feast/core/job/Runner.java index 4e2033fed69..acccb70c8b2 100644 --- a/core/src/main/java/feast/core/job/Runner.java +++ b/core/src/main/java/feast/core/job/Runner.java @@ -16,33 +16,37 @@ */ package feast.core.job; +import java.util.NoSuchElementException; + +/** + * An Apache Beam Runner, for which Feast Core supports managing ingestion jobs. + * + * @see Beam Runners + */ public enum Runner { DATAFLOW("DataflowRunner"), FLINK("FlinkRunner"), DIRECT("DirectRunner"); - private final String name; + private final String humanName; - Runner(String name) { - this.name = name; + Runner(String humanName) { + this.humanName = humanName; } - /** - * Get the human readable name of this runner. Returns a human readable name of the runner that - * can be used for logging/config files/etc. - */ + /** Returns the human readable name of this runner, usable in logging, config files, etc. */ @Override public String toString() { - return name; + return humanName; } /** Parses a runner from its human readable name. */ - public static Runner fromString(String runner) { + public static Runner fromString(String humanName) { for (Runner r : Runner.values()) { - if (r.toString().equals(runner)) { + if (r.toString().equals(humanName)) { return r; } } - throw new IllegalArgumentException("Unknown value: " + runner); + throw new NoSuchElementException("Unknown Runner value: " + humanName); } } diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java index 6002133e828..880dd6c146b 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java @@ -220,7 +220,7 @@ public Job restartJob(Job job) { */ @Override public JobStatus getJobStatus(Job job) { - if (!Runner.DATAFLOW.name().equals(job.getRunner())) { + if (job.getRunner() != RUNNER_TYPE) { return job.getStatus(); } @@ -252,7 +252,7 @@ private Job submitDataflowJob( return new Job( jobName, jobId, - getRunnerType().name(), + getRunnerType(), Source.fromProto(source), Store.fromProto(sink), featureSets, diff --git a/core/src/main/java/feast/core/model/Job.java b/core/src/main/java/feast/core/model/Job.java index 060edefc3b9..95bcd79e6c0 100644 --- a/core/src/main/java/feast/core/model/Job.java +++ b/core/src/main/java/feast/core/model/Job.java @@ -19,6 +19,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import feast.core.FeatureSetProto; import feast.core.IngestionJobProto; +import feast.core.job.Runner; import java.util.ArrayList; import java.util.List; import javax.persistence.CascadeType; @@ -55,9 +56,9 @@ public class Job extends AbstractTimestampEntity { private String extId; // Runner type - // Use Runner.name() when converting a Runner to string to assign to this property. + @Enumerated(EnumType.STRING) @Column(name = "runner") - private String runner; + private Runner runner; // Source id @ManyToOne @@ -96,7 +97,7 @@ public Job() { public Job( String id, String extId, - String runner, + Runner runner, Source source, Store sink, List featureSets, diff --git a/core/src/main/java/feast/core/service/JobService.java b/core/src/main/java/feast/core/service/JobService.java index bf74b90e80c..c8fc5caf5e1 100644 --- a/core/src/main/java/feast/core/service/JobService.java +++ b/core/src/main/java/feast/core/service/JobService.java @@ -29,6 +29,7 @@ import feast.core.IngestionJobProto; import feast.core.dao.JobRepository; import feast.core.job.JobManager; +import feast.core.job.Runner; import feast.core.log.Action; import feast.core.log.AuditLogger; import feast.core.log.Resource; @@ -50,13 +51,13 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -/** Defines a Job Managemenent Service that allows users to manage feast ingestion jobs. */ +/** A Job Management Service that allows users to manage Feast ingestion jobs. */ @Slf4j @Service public class JobService { - private JobRepository jobRepository; - private SpecService specService; - private Map jobManagers; + private final JobRepository jobRepository; + private final SpecService specService; + private final Map jobManagers; @Autowired public JobService( @@ -66,13 +67,13 @@ public JobService( this.jobManagers = new HashMap<>(); for (JobManager manager : jobManagerList) { - this.jobManagers.put(manager.getRunnerType().name(), manager); + this.jobManagers.put(manager.getRunnerType(), manager); } } /* Job Service API */ /** - * List Ingestion Jobs in feast matching the given request. See CoreService protobuf documentation + * List Ingestion Jobs in Feast matching the given request. See CoreService protobuf documentation * for more detailed documentation. * * @param request list ingestion jobs request specifying which jobs to include diff --git a/core/src/test/java/feast/core/job/JobUpdateTaskTest.java b/core/src/test/java/feast/core/job/JobUpdateTaskTest.java index 2a1e80994ae..5faf446a948 100644 --- a/core/src/test/java/feast/core/job/JobUpdateTaskTest.java +++ b/core/src/test/java/feast/core/job/JobUpdateTaskTest.java @@ -102,7 +102,7 @@ public void shouldUpdateJobIfPresent() { new Job( "job", "old_ext", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -119,7 +119,7 @@ public void shouldUpdateJobIfPresent() { new Job( "job", "old_ext", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1), FeatureSet.fromProto(featureSet2)), @@ -129,7 +129,7 @@ public void shouldUpdateJobIfPresent() { new Job( "job", "new_ext", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, Source.fromProto(source), Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1), FeatureSet.fromProto(featureSet2)), @@ -163,7 +163,7 @@ public void shouldCreateJobIfNotPresent() { new Job( "job", "", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -173,7 +173,7 @@ public void shouldCreateJobIfNotPresent() { new Job( "job", "ext", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -202,7 +202,7 @@ public void shouldUpdateJobStatusIfNotCreateOrUpdate() { new Job( "job", "ext", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -216,7 +216,7 @@ public void shouldUpdateJobStatusIfNotCreateOrUpdate() { new Job( "job", "ext", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, Source.fromProto(source), Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -248,7 +248,7 @@ public void shouldReturnJobWithErrorStatusIfFailedToSubmit() { new Job( "job", "", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -258,7 +258,7 @@ public void shouldReturnJobWithErrorStatusIfFailedToSubmit() { new Job( "job", "", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), diff --git a/core/src/test/java/feast/core/job/RunnerTest.java b/core/src/test/java/feast/core/job/RunnerTest.java new file mode 100644 index 00000000000..ce1700acbe9 --- /dev/null +++ b/core/src/test/java/feast/core/job/RunnerTest.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.job; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import java.util.NoSuchElementException; +import org.junit.Test; + +public class RunnerTest { + + @Test + public void toStringReturnsHumanReadableName() { + assertThat(Runner.DATAFLOW.toString(), is("DataflowRunner")); + } + + @Test + public void fromStringLoadsValueFromHumanReadableName() { + var humanName = Runner.DATAFLOW.toString(); + assertThat(Runner.fromString(humanName), is(Runner.DATAFLOW)); + } + + @Test(expected = NoSuchElementException.class) + public void fromStringThrowsNoSuchElementExceptionForUnknownValue() { + Runner.fromString("this is not a valid Runner"); + } +} diff --git a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java index e610f393732..72b921ef694 100644 --- a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java +++ b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java @@ -158,7 +158,7 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException { new Job( jobName, "", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, Source.fromProto(source), Store.fromProto(store), Lists.newArrayList(FeatureSet.fromProto(featureSet)), @@ -239,7 +239,7 @@ public void shouldThrowExceptionWhenJobStateTerminal() throws IOException { new Job( "job", "", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, Source.fromProto(source), Store.fromProto(store), Lists.newArrayList(FeatureSet.fromProto(featureSet)), diff --git a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java index 76530d9f404..6980450ca4d 100644 --- a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java +++ b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java @@ -144,7 +144,7 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException { new Job( expectedJobId, "", - Runner.DIRECT.name(), + Runner.DIRECT, Source.fromProto(source), Store.fromProto(store), Lists.newArrayList(FeatureSet.fromProto(featureSet)), diff --git a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java index 52e838c3d9d..59fdc32b20f 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java @@ -164,7 +164,7 @@ public void shouldGenerateAndSubmitJobsIfAny() throws InvalidProtocolBufferExcep new Job( "", "", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1), FeatureSet.fromProto(featureSet2)), @@ -174,7 +174,7 @@ public void shouldGenerateAndSubmitJobsIfAny() throws InvalidProtocolBufferExcep new Job( "some_id", extId, - Runner.DATAFLOW.name(), + Runner.DATAFLOW, feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1), FeatureSet.fromProto(featureSet2)), @@ -264,7 +264,7 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { new Job( "name1", "", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, feast.core.model.Source.fromProto(source1), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -274,7 +274,7 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { new Job( "name1", "extId1", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, feast.core.model.Source.fromProto(source1), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -284,7 +284,7 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { new Job( "", "extId2", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, feast.core.model.Source.fromProto(source2), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet2)), @@ -294,7 +294,7 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { new Job( "name2", "extId2", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, feast.core.model.Source.fromProto(source2), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet2)), diff --git a/core/src/test/java/feast/core/service/JobServiceTest.java b/core/src/test/java/feast/core/service/JobServiceTest.java index c0e90ca43f4..6f34205bbfd 100644 --- a/core/src/test/java/feast/core/service/JobServiceTest.java +++ b/core/src/test/java/feast/core/service/JobServiceTest.java @@ -179,7 +179,7 @@ private Job newDummyJob(String id, String extId, JobStatus status) { return new Job( id, extId, - Runner.DATAFLOW.name(), + Runner.DATAFLOW, this.dataSource, this.dataStore, Arrays.asList(this.featureSet),