Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,15 @@ public CheckEnabled getStableUniqueNames() {
@Override
public void setStableUniqueNames(CheckEnabled enabled) {
}

@Override
public String getTempLocation() {
return null;
}

@Override
public void setTempLocation(String tempLocation) {
}
};
}
return options;
Expand Down Expand Up @@ -628,4 +637,4 @@ public void restoreState(StreamTaskState taskState, long recoveryTimestamp) thro
// restore the timerInternals.
this.timerInternals.restoreTimerInternals(reader, inputKvCoder, windowCoder);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ public interface DataflowPipelineOptions extends
GcsOptions, StreamingOptions, CloudDebuggerOptions, DataflowWorkerLoggingOptions,
DataflowProfilingOptions {

static final String DATAFLOW_STORAGE_LOCATION = "Dataflow Storage Location";

@Description("Project id. Required when running a Dataflow in the cloud. "
+ "See https://cloud.google.com/storage/docs/projects for further details.")
@Override
Expand All @@ -45,37 +43,19 @@ public interface DataflowPipelineOptions extends
@Override
void setProject(String value);

/**
* GCS path for temporary files, e.g. gs://bucket/object
*
* <p>Must be a valid Cloud Storage URL, beginning with the prefix "gs://"
*
* <p>At least one of {@link #getTempLocation()} or {@link #getStagingLocation()} must be set. If
* {@link #getTempLocation()} is not set, then the Dataflow pipeline defaults to using
* {@link #getStagingLocation()}.
*/
@Description("GCS path for temporary files, eg \"gs://bucket/object\". "
+ "Must be a valid Cloud Storage URL, beginning with the prefix \"gs://\". "
+ "At least one of tempLocation or stagingLocation must be set. If tempLocation is unset, "
+ "defaults to using stagingLocation.")
@Validation.Required(groups = {DATAFLOW_STORAGE_LOCATION})
String getTempLocation();
void setTempLocation(String value);

/**
* GCS path for staging local files, e.g. gs://bucket/object
*
* <p>Must be a valid Cloud Storage URL, beginning with the prefix "gs://"
*
* <p>At least one of {@link #getTempLocation()} or {@link #getStagingLocation()} must be set. If
* {@link #getTempLocation()} is not set, then the Dataflow pipeline defaults to using
* {@link #getStagingLocation()}.
* <p>At least one of {@link PipelineOptions#getTempLocation()} or {@link #getStagingLocation()}
* must be set. If {@link #getStagingLocation()} is not set, then the Dataflow
* pipeline defaults to using {@link PipelineOptions#getTempLocation()}.
*/
@Description("GCS path for staging local files, e.g. \"gs://bucket/object\". "
+ "Must be a valid Cloud Storage URL, beginning with the prefix \"gs://\". "
+ "At least one of stagingLocation or tempLocation must be set. If stagingLocation is unset, "
+ "defaults to using tempLocation.")
@Validation.Required(groups = {DATAFLOW_STORAGE_LOCATION})
String getStagingLocation();
void setStagingLocation(String value);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,4 +246,18 @@ public static enum CheckEnabled {
@Default.Enum("WARNING")
CheckEnabled getStableUniqueNames();
void setStableUniqueNames(CheckEnabled enabled);

/**
* A pipeline level default location for storing temporary files.
*
* <p>This can be a path of any file system.
*
* <p>{@link #getTempLocation()} can be used as a default location in other
* {@link PipelineOptions}.
*
* <p>If it is unset, {@link PipelineRunner} can override it.
*/
@Description("A pipeline level default location for storing temporary files.")
String getTempLocation();
void setTempLocation(String value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ public static DataflowPipelineRunner fromOptions(PipelineOptions options) {
}

PathValidator validator = dataflowOptions.getPathValidator();
Preconditions.checkArgument(!(Strings.isNullOrEmpty(dataflowOptions.getTempLocation())
&& Strings.isNullOrEmpty(dataflowOptions.getStagingLocation())),
"Missing required value: at least one of tempLocation or stagingLocation must be set.");

if (dataflowOptions.getStagingLocation() != null) {
validator.validateOutputFilePrefixSupported(dataflowOptions.getStagingLocation());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,10 +649,8 @@ public void testNoStagingLocationAndNoTempLocationFails() {
options.setProject("foo-project");

thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Missing required value for group");
thrown.expectMessage(DataflowPipelineOptions.DATAFLOW_STORAGE_LOCATION);
thrown.expectMessage("getStagingLocation");
thrown.expectMessage("getTempLocation");
thrown.expectMessage(
"Missing required value: at least one of tempLocation or stagingLocation must be set.");

DataflowPipelineRunner.fromOptions(options);
}
Expand Down