Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class DataflowRunnerConfig extends RunnerConfig {
public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) {
this.project = runnerConfigOptions.getProject();
this.region = runnerConfigOptions.getRegion();
this.zone = runnerConfigOptions.getZone();
this.workerZone = runnerConfigOptions.getWorkerZone();
this.serviceAccount = runnerConfigOptions.getServiceAccount();
this.network = runnerConfigOptions.getNetwork();
this.subnetwork = runnerConfigOptions.getSubnetwork();
Expand All @@ -44,6 +44,8 @@ public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) {
this.deadLetterTableSpec = runnerConfigOptions.getDeadLetterTableSpec();
this.diskSizeGb = runnerConfigOptions.getDiskSizeGb();
this.labels = runnerConfigOptions.getLabelsMap();
this.enableStreamingEngine = runnerConfigOptions.getEnableStreamingEngine();
this.workerDiskType = runnerConfigOptions.getWorkerDiskType();
validate();
}

Expand All @@ -54,7 +56,7 @@ public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) {
@NotBlank public String region;

/* GCP availability zone for operations. */
@NotBlank public String zone;
@NotBlank public String workerZone;

/* Run the job as a specific service account, instead of the default GCE robot. */
public String serviceAccount;
Expand Down Expand Up @@ -91,6 +93,12 @@ public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) {

public Map<String, String> labels;

/* If true job will be run on StreamingEngine instead of VMs */
public Boolean enableStreamingEngine;

/* Type of persistent disk to be used by workers */
public String workerDiskType;

/** Validates Dataflow runner configuration options */
public void validate() {
ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ feast:
options:
project: my_gcp_project
region: asia-east1
zone: asia-east1-a
workerZone: asia-east1-a
tempLocation: gs://bucket/tempLocation
network: default
subnetwork: regions/asia-east1/subnetworks/mysubnetwork
maxNumWorkers: 1
enableStreamingEngine: false
workerDiskType: compute.googleapis.com/projects/asia-east1-a/diskTypes/pd-ssd
autoscalingAlgorithm: THROUGHPUT_BASED
usePublicIps: false
workerMachineType: n1-standard-1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void setUp() {
Builder optionsBuilder = DataflowRunnerConfigOptions.newBuilder();
optionsBuilder.setProject("project");
optionsBuilder.setRegion("region");
optionsBuilder.setZone("zone");
optionsBuilder.setWorkerZone("zone");
optionsBuilder.setTempLocation("tempLocation");
optionsBuilder.setNetwork("network");
optionsBuilder.setSubnetwork("subnetwork");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ public void shouldConvertToPipelineArgs() throws IllegalAccessException {
DataflowRunnerConfigOptions.newBuilder()
.setProject("my-project")
.setRegion("asia-east1")
.setZone("asia-east1-a")
.setWorkerZone("asia-east1-a")
.setEnableStreamingEngine(true)
.setWorkerDiskType("pd-ssd")
.setTempLocation("gs://bucket/tempLocation")
.setNetwork("default")
.setSubnetwork("regions/asia-east1/subnetworks/mysubnetwork")
Expand All @@ -52,7 +54,7 @@ public void shouldConvertToPipelineArgs() throws IllegalAccessException {
Arrays.asList(
"--project=my-project",
"--region=asia-east1",
"--zone=asia-east1-a",
"--workerZone=asia-east1-a",
"--tempLocation=gs://bucket/tempLocation",
"--network=default",
"--subnetwork=regions/asia-east1/subnetworks/mysubnetwork",
Expand All @@ -62,7 +64,9 @@ public void shouldConvertToPipelineArgs() throws IllegalAccessException {
"--workerMachineType=n1-standard-1",
"--deadLetterTableSpec=project_id:dataset_id.table_id",
"--diskSizeGb=100",
"--labels={\"key\":\"value\"}")
"--labels={\"key\":\"value\"}",
"--enableStreamingEngine=true",
"--workerDiskType=pd-ssd")
.toArray(String[]::new);
assertThat(args.size(), equalTo(expectedArgs.length));
assertThat(args, containsInAnyOrder(expectedArgs));
Expand All @@ -74,7 +78,7 @@ public void shouldIgnoreOptionalArguments() throws IllegalAccessException {
DataflowRunnerConfigOptions.newBuilder()
.setProject("my-project")
.setRegion("asia-east1")
.setZone("asia-east1-a")
.setWorkerZone("asia-east1-a")
.setTempLocation("gs://bucket/tempLocation")
.setNetwork("default")
.setSubnetwork("regions/asia-east1/subnetworks/mysubnetwork")
Expand All @@ -90,15 +94,16 @@ public void shouldIgnoreOptionalArguments() throws IllegalAccessException {
Arrays.asList(
"--project=my-project",
"--region=asia-east1",
"--zone=asia-east1-a",
"--workerZone=asia-east1-a",
"--tempLocation=gs://bucket/tempLocation",
"--network=default",
"--subnetwork=regions/asia-east1/subnetworks/mysubnetwork",
"--maxNumWorkers=1",
"--autoscalingAlgorithm=THROUGHPUT_BASED",
"--usePublicIps=false",
"--workerMachineType=n1-standard-1",
"--labels={}")
"--labels={}",
"--enableStreamingEngine=false")
.toArray(String[]::new);
assertThat(args.size(), equalTo(expectedArgs.length));
assertThat(args, containsInAnyOrder(expectedArgs));
Expand Down
2 changes: 1 addition & 1 deletion infra/charts/feast/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ feast-core:
options:
project: <google_project_id>
region: <dataflow_regional_endpoint e.g. asia-east1>
zone: <google_zone e.g. asia-east1-a>
workerZone: <google_zone e.g. asia-east1-a>
tempLocation: <gcs_path_for_temp_files e.g. gs://bucket/tempLocation>
network: <google_cloud_network_name>
subnetwork: <google_cloud_subnetwork_path e.g. regions/asia-east1/subnetworks/mysubnetwork>
Expand Down
2 changes: 1 addition & 1 deletion infra/charts/feast/README.md.gotmpl
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ feast-core:
options:
project: <google_project_id>
region: <dataflow_regional_endpoint e.g. asia-east1>
zone: <google_zone e.g. asia-east1-a>
workerZone: <google_zone e.g. asia-east1-a>
tempLocation: <gcs_path_for_temp_files e.g. gs://bucket/tempLocation>
network: <google_cloud_network_name>
subnetwork: <google_cloud_subnetwork_path e.g. regions/asia-east1/subnetworks/mysubnetwork>
Expand Down
4 changes: 3 additions & 1 deletion infra/charts/feast/values-dataflow-runner.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ feast-core:
options:
project: <google_project_id>
region: <dataflow_regional_endpoint e.g. asia-east1>
zone: <google_zone e.g. asia-east1-a>
workerZone: <google_zone e.g. asia-east1-a>
tempLocation: <gcs_path_for_temp_files e.g. gs://bucket/tempLocation>
network: <google_cloud_network_name>
subnetwork: <google_cloud_subnetwork_path e.g. regions/asia-east1/subnetworks/mysubnetwork>
enableStreamingEngine: false
workerDiskType: <disk_type e.g. compute.googleapis.com/projects/asia-east1-a/diskTypes/pd-ssd>
maxNumWorkers: 1
autoscalingAlgorithm: THROUGHPUT_BASED
usePublicIps: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ feast-core:
options:
project: $GCLOUD_PROJECT
region: $GCLOUD_REGION
zone: $GCLOUD_REGION-a
workerZone: $GCLOUD_REGION-a
tempLocation: gs://$TEMP_BUCKET/tempLocation
network: $GCLOUD_NETWORK
subnetwork: regions/$GCLOUD_REGION/subnetworks/$GCLOUD_SUBNET
Expand Down
7 changes: 6 additions & 1 deletion protos/feast/core/Runner.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ message DataflowRunnerConfigOptions {
string region = 2;

/* GCP availability zone for operations. */
string zone = 3;
string workerZone = 3;

/* Run the job as a specific service account, instead of the default GCE robot. */
string serviceAccount = 4;
Expand Down Expand Up @@ -81,4 +81,9 @@ message DataflowRunnerConfigOptions {
/* Disk size to use on each remote Compute Engine worker instance */
int32 diskSizeGb = 14;

/* Run job on Dataflow Streaming Engine instead of creating worker VMs */
bool enableStreamingEngine = 15;

/* Type of persistent disk to be used by workers */
string workerDiskType = 16;
}