Skip to content

Commit

Permalink
[BEAM-59] Register standard FileSystems wherever we register IOChanne…
Browse files Browse the repository at this point in the history
…lFactories

Additionally, drop an unnecessary use of `GcsOptions` in
`PipelineRunner`.
  • Loading branch information
dhalperi committed Apr 18, 2017
1 parent 36e4355 commit 88cc651
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.IOChannelUtils;

Expand Down Expand Up @@ -55,6 +56,7 @@ public PipelineOptions getPipelineOptions() {
pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);

IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
FileSystems.setDefaultConfigInWorkers(pipelineOptions);
} catch (IOException e) {
throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
Expand Down Expand Up @@ -130,6 +131,7 @@ static PipelineOptions getOrInit(String serializedPipelineOptions) {
}
// register IO factories.
IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
FileSystems.setDefaultConfigInWorkers(pipelineOptions);
}
return pipelineOptions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.GcsOptions;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.InstanceBuilder;

Expand All @@ -41,11 +40,11 @@ public abstract class PipelineRunner<ResultT extends PipelineResult> {
* @return The newly created runner.
*/
public static PipelineRunner<? extends PipelineResult> fromOptions(PipelineOptions options) {
GcsOptions gcsOptions = PipelineOptionsValidator.validate(GcsOptions.class, options);
checkNotNull(options);

// (Re-)register standard IO factories. Clobbers any prior credentials.
IOChannelUtils.registerIOFactoriesAllowOverride(gcsOptions);
IOChannelUtils.registerIOFactoriesAllowOverride(options);
FileSystems.setDefaultConfigInWorkers(options);

@SuppressWarnings("unchecked")
PipelineRunner<? extends PipelineResult> result =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
Expand Down Expand Up @@ -404,6 +405,7 @@ public static PipelineOptions testingPipelineOptions() {
options.setStableUniqueNames(CheckEnabled.ERROR);

IOChannelUtils.registerIOFactoriesAllowOverride(options);
FileSystems.setDefaultConfigInWorkers(options);
return options;
} catch (IOException e) {
throw new RuntimeException(
Expand Down

0 comments on commit 88cc651

Please sign in to comment.