From 867c4f817e9c8788e9665b3a973e51ee28517978 Mon Sep 17 00:00:00 2001 From: Chris Hebert Date: Thu, 10 Aug 2017 14:56:32 -0500 Subject: [PATCH 01/10] [BEAM-2750][BEAM-2751] Implement WholeFileIO --- .../org/apache/beam/sdk/io/WholeFileIO.java | 224 ++++++++++++++++++ 1 file changed, 224 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java new file mode 100644 index 000000000000..94c885254b7f --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java @@ -0,0 +1,224 @@ +package org.apache.beam.sdk.io; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY; +import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE; +import static org.apache.beam.sdk.util.MimeTypes.BINARY; + +import com.google.auto.value.AutoValue; +import com.google.protobuf.ByteString; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.util.Collections; +import java.util.List; + +import javax.annotation.Nullable; + +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PDone; + +/** + * WholeFileIO. + */ +public class WholeFileIO { + + public static Read read() { + return new AutoValue_WholeFileIO_Read.Builder().build(); + } + + public static Write write() { + return new AutoValue_WholeFileIO_Write.Builder().build(); + } + + /** + * Implements read(). + */ + @AutoValue + public abstract static class Read extends PTransform>> { + @Nullable + abstract ValueProvider getFilePattern(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setFilePattern(ValueProvider filePattern); + + abstract Read build(); + } + + public Read from(String filePattern) { + checkNotNull(filePattern, "FilePattern cannot be empty."); + return from(ValueProvider.StaticValueProvider.of(filePattern)); + } + + public Read from(ValueProvider filePattern) { + checkNotNull(filePattern, "FilePattern cannot be empty."); + return toBuilder().setFilePattern(filePattern).build(); + } + + @Override + public PCollection> expand(PBegin input) { + checkNotNull( + getFilePattern(), + "Need to set the filePattern of a WholeFileIO.Read transform." + ); + + String filePattern = getFilePattern().get(); + + PCollection filePatternPCollection = input.apply(Create.of(filePattern)); + + PCollection resourceIds = filePatternPCollection.apply( + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + String filePattern = c.element(); + try { + List matchResults = FileSystems.match( + Collections.singletonList(filePattern)); + for (MatchResult matchResult : matchResults) { + List metadataList = matchResult.metadata(); + for (MatchResult.Metadata metadata : metadataList) { + ResourceId resourceId = metadata.resourceId(); + c.output(resourceId); + } + } + } catch (IOException e) { + e.printStackTrace(); + } + } + } + ) + ); + + PCollection> files = resourceIds.apply( + ParDo.of( + new DoFn>() { + @ProcessElement + public void processElement(ProcessContext c) { + ResourceId resourceId = c.element(); + + try { + ReadableByteChannel channel = FileSystems.open(resourceId); + ByteBuffer byteBuffer = ByteBuffer.allocate(8192); + ByteString byteString = ByteString.EMPTY; + + while (channel.read(byteBuffer) != -1) { + byteBuffer.flip(); + byteString = byteString.concat(ByteString.copyFrom(byteBuffer)); + byteBuffer.clear(); + } + + KV kv = KV.of( + resourceId.getFilename(), + byteString.toByteArray() + ); + + c.output(kv); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + ) + ); + + return files; + } + } + + /** + * Implements write(). + */ + @AutoValue + public abstract static class Write extends PTransform>, PDone> { + + @Nullable abstract ValueProvider getOutputDir(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setOutputDir(ValueProvider outputDir); + + abstract Write build(); + } + + public Write to(String outputDir) { + return to(FileBasedSink.convertToFileResourceIfPossible(outputDir)); + } + + public Write to(ResourceId outputDir) { + return toResource(ValueProvider.StaticValueProvider.of(outputDir)); + } + + public Write toResource(ValueProvider outputDir) { + return toBuilder().setOutputDir(outputDir).build(); + } + + @Override + public PDone expand(PCollection> input) { + checkNotNull( + getOutputDir(), + "Need to set the output directory of a WholeFileIO.Write transform." + ); + + ResourceId outputDir = getOutputDir().get(); + if (!outputDir.isDirectory()) { + outputDir = outputDir.getCurrentDirectory() + .resolve(outputDir.getFilename(), RESOLVE_DIRECTORY); + } + final PCollectionView outputDirView = input.getPipeline() + .apply(Create.of(outputDir)) + .apply(View.asSingleton()); + + input.apply( + ParDo.of( + new DoFn, Void>() { + @ProcessElement + public void processElement(ProcessContext c) { + KV kv = c.element(); + + ResourceId outputDir = c.sideInput(outputDirView); + String filename = kv.getKey(); + ResourceId outputFile = outputDir.resolve(filename, RESOLVE_FILE); + + byte[] bytes = kv.getValue(); + try { + WritableByteChannel channel = FileSystems.create(outputFile, BINARY); + OutputStream os = Channels.newOutputStream(channel); + os.write(bytes); + os.flush(); + os.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + ).withSideInputs(outputDirView) + ); + + return PDone.in(input.getPipeline()); + } + } + + /** Disable construction of utility class. */ + private WholeFileIO() {} +} From 0380df7f2938ecee4abf4ea45f6ac16f26c5d36d Mon Sep 17 00:00:00 2001 From: Chris Hebert Date: Thu, 10 Aug 2017 22:18:38 -0500 Subject: [PATCH 02/10] [BEAM-2750][BEAM-2751] Add license to WholeFileIO --- .../org/apache/beam/sdk/io/WholeFileIO.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java index 94c885254b7f..752e0fc0b890 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkNotNull; From c296a65ade7e66cbf6d1f6fc4a5ccf937418138d Mon Sep 17 00:00:00 2001 From: Chris Hebert Date: Wed, 16 Aug 2017 12:11:42 -0500 Subject: [PATCH 03/10] [BEAM-2750][BEAM-2751] Improve WholeFileIO --- .../org/apache/beam/sdk/io/WholeFileIO.java | 110 ++++++------------ 1 file changed, 37 insertions(+), 73 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java index 752e0fc0b890..a7b8511d9135 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java @@ -18,24 +18,19 @@ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkNotNull; -import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY; import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE; import static org.apache.beam.sdk.util.MimeTypes.BINARY; import com.google.auto.value.AutoValue; -import com.google.protobuf.ByteString; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; -import java.nio.ByteBuffer; import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; -import java.nio.channels.WritableByteChannel; -import java.util.Collections; -import java.util.List; import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.ValueProvider; @@ -43,13 +38,14 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.util.StreamUtils; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; +// TODO: Write comment on what this is and how to use it. +// TODO: Mention that if multiple files have the same filename at write, only one will survive. /** * WholeFileIO. */ @@ -59,6 +55,8 @@ public static Read read() { return new AutoValue_WholeFileIO_Read.Builder().build(); } + // TODO: Add a readAll() like TextIO. + public static Write write() { return new AutoValue_WholeFileIO_Write.Builder().build(); } @@ -97,59 +95,28 @@ public PCollection> expand(PBegin input) { "Need to set the filePattern of a WholeFileIO.Read transform." ); - String filePattern = getFilePattern().get(); - - PCollection filePatternPCollection = input.apply(Create.of(filePattern)); + PCollection filePatternPCollection = input.apply( + Create.ofProvider(getFilePattern(), StringUtf8Coder.of())); - PCollection resourceIds = filePatternPCollection.apply( - ParDo.of( - new DoFn() { - @ProcessElement - public void processElement(ProcessContext c) { - String filePattern = c.element(); - try { - List matchResults = FileSystems.match( - Collections.singletonList(filePattern)); - for (MatchResult matchResult : matchResults) { - List metadataList = matchResult.metadata(); - for (MatchResult.Metadata metadata : metadataList) { - ResourceId resourceId = metadata.resourceId(); - c.output(resourceId); - } - } - } catch (IOException e) { - e.printStackTrace(); - } - } - } - ) - ); + PCollection matchResultMetaData = filePatternPCollection.apply( + Match.filepatterns()); - PCollection> files = resourceIds.apply( + PCollection> files = matchResultMetaData.apply( ParDo.of( - new DoFn>() { + new DoFn>() { @ProcessElement public void processElement(ProcessContext c) { - ResourceId resourceId = c.element(); - - try { - ReadableByteChannel channel = FileSystems.open(resourceId); - ByteBuffer byteBuffer = ByteBuffer.allocate(8192); - ByteString byteString = ByteString.EMPTY; - - while (channel.read(byteBuffer) != -1) { - byteBuffer.flip(); - byteString = byteString.concat(ByteString.copyFrom(byteBuffer)); - byteBuffer.clear(); - } - - KV kv = KV.of( - resourceId.getFilename(), - byteString.toByteArray() - ); - + MatchResult.Metadata metadata = c.element(); + ResourceId resourceId = metadata.resourceId(); + + try ( + InputStream inStream = Channels.newInputStream(FileSystems.open(resourceId)) + ) { + byte[] bytes = StreamUtils.getBytes(inStream); + KV kv = KV.of(resourceId.getFilename(), bytes); c.output(kv); } catch (IOException e) { + // TODO: Don't do this. See PTransform style guide section on errors. e.printStackTrace(); } } @@ -179,7 +146,7 @@ abstract static class Builder { } public Write to(String outputDir) { - return to(FileBasedSink.convertToFileResourceIfPossible(outputDir)); + return to(FileSystems.matchNewResource(outputDir, true)); } public Write to(ResourceId outputDir) { @@ -197,15 +164,6 @@ public PDone expand(PCollection> input) { "Need to set the output directory of a WholeFileIO.Write transform." ); - ResourceId outputDir = getOutputDir().get(); - if (!outputDir.isDirectory()) { - outputDir = outputDir.getCurrentDirectory() - .resolve(outputDir.getFilename(), RESOLVE_DIRECTORY); - } - final PCollectionView outputDirView = input.getPipeline() - .apply(Create.of(outputDir)) - .apply(View.asSingleton()); - input.apply( ParDo.of( new DoFn, Void>() { @@ -213,23 +171,29 @@ public PDone expand(PCollection> input) { public void processElement(ProcessContext c) { KV kv = c.element(); - ResourceId outputDir = c.sideInput(outputDirView); + ResourceId outputDir = getOutputDir().get(); String filename = kv.getKey(); + // TODO: Write to tmp files. Once tmp file write finished, rename to filename. + // TODO: ^ Alternative (faster): setup() create tmp dir, processElement() + // write each file to tmp dir, teardown() rename tmp dir to outputDir + // (Or, instead of setup() and teardown(), use startBundle() and finishBundle() + // except that you mv all files inside tmp dir to inside the outputDir ResourceId outputFile = outputDir.resolve(filename, RESOLVE_FILE); byte[] bytes = kv.getValue(); - try { - WritableByteChannel channel = FileSystems.create(outputFile, BINARY); - OutputStream os = Channels.newOutputStream(channel); - os.write(bytes); - os.flush(); - os.close(); + try ( + OutputStream outStream = + Channels.newOutputStream(FileSystems.create(outputFile, BINARY)) + ) { + outStream.write(bytes); + outStream.flush(); } catch (IOException e) { + // TODO: Don't do this. See PTransform style guide section on errors. e.printStackTrace(); } } } - ).withSideInputs(outputDirView) + ) ); return PDone.in(input.getPipeline()); From 63d12201fe3a926f76b94c8391c8a085f5ce2552 Mon Sep 17 00:00:00 2001 From: Chris Hebert Date: Wed, 16 Aug 2017 12:46:23 -0500 Subject: [PATCH 04/10] [BEAM-2750][BEAM-2751] Throw errors in WholeFileIO Formerly, WholeFileIO catch-statements suppressed IOExceptions. This commit makes WholeFileIO throw IOExceptions in @ProcessElement methods of DoFns so they fail properly. --- .../java/org/apache/beam/sdk/io/WholeFileIO.java | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java index a7b8511d9135..00eb44476086 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java @@ -105,19 +105,14 @@ public PCollection> expand(PBegin input) { ParDo.of( new DoFn>() { @ProcessElement - public void processElement(ProcessContext c) { + public void processElement(ProcessContext c) throws IOException { MatchResult.Metadata metadata = c.element(); ResourceId resourceId = metadata.resourceId(); try ( InputStream inStream = Channels.newInputStream(FileSystems.open(resourceId)) ) { - byte[] bytes = StreamUtils.getBytes(inStream); - KV kv = KV.of(resourceId.getFilename(), bytes); - c.output(kv); - } catch (IOException e) { - // TODO: Don't do this. See PTransform style guide section on errors. - e.printStackTrace(); + c.output(KV.of(resourceId.getFilename(), StreamUtils.getBytes(inStream))); } } } @@ -168,7 +163,7 @@ public PDone expand(PCollection> input) { ParDo.of( new DoFn, Void>() { @ProcessElement - public void processElement(ProcessContext c) { + public void processElement(ProcessContext c) throws IOException { KV kv = c.element(); ResourceId outputDir = getOutputDir().get(); @@ -187,9 +182,6 @@ public void processElement(ProcessContext c) { ) { outStream.write(bytes); outStream.flush(); - } catch (IOException e) { - // TODO: Don't do this. See PTransform style guide section on errors. - e.printStackTrace(); } } } From aa1710c5cdd6689ea332a0e8207874847f445342 Mon Sep 17 00:00:00 2001 From: Chris Hebert Date: Wed, 16 Aug 2017 14:15:10 -0500 Subject: [PATCH 05/10] [BEAM-2750][BEAM-2751] Use tmp dirs in WholeFileIO Formerly, WholeFileIO wrote directly to intended output files. This commit makes WholeFileIO write to a temporary directory first, then rename the temporary directory to the intended name. This required copying a TemporaryDirectoryBuilder marked private in FileBasedSink.WriteOperation. --- .../org/apache/beam/sdk/io/WholeFileIO.java | 80 +++++++++++++++++-- 1 file changed, 72 insertions(+), 8 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java index 00eb44476086..1e1a12541896 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java @@ -27,22 +27,32 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.channels.Channels; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MoveOptions; +import org.apache.beam.sdk.io.fs.ResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.StreamUtils; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; +import org.joda.time.Instant; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; // TODO: Write comment on what this is and how to use it. // TODO: Mention that if multiple files have the same filename at write, only one will survive. @@ -128,6 +138,7 @@ public void processElement(ProcessContext c) throws IOException { */ @AutoValue public abstract static class Write extends PTransform>, PDone> { + private static final Logger LOG = LoggerFactory.getLogger(Write.class); @Nullable abstract ValueProvider getOutputDir(); @@ -162,26 +173,57 @@ public PDone expand(PCollection> input) { input.apply( ParDo.of( new DoFn, Void>() { + ValueProvider tmpDir; + + @Setup + public void setup() { + tmpDir = ValueProvider.NestedValueProvider.of( + getOutputDir(), + new TemporaryDirectoryBuilder() + ); + } + @ProcessElement public void processElement(ProcessContext c) throws IOException { KV kv = c.element(); - ResourceId outputDir = getOutputDir().get(); String filename = kv.getKey(); - // TODO: Write to tmp files. Once tmp file write finished, rename to filename. - // TODO: ^ Alternative (faster): setup() create tmp dir, processElement() - // write each file to tmp dir, teardown() rename tmp dir to outputDir - // (Or, instead of setup() and teardown(), use startBundle() and finishBundle() - // except that you mv all files inside tmp dir to inside the outputDir - ResourceId outputFile = outputDir.resolve(filename, RESOLVE_FILE); + ResourceId tmpFile = tmpDir.get().resolve(filename, RESOLVE_FILE); byte[] bytes = kv.getValue(); try ( OutputStream outStream = - Channels.newOutputStream(FileSystems.create(outputFile, BINARY)) + Channels.newOutputStream(FileSystems.create(tmpFile, BINARY)) ) { outStream.write(bytes); outStream.flush(); + } catch (IOException e) { + LOG.error( + "Failed to write to temporary file [{}] for [{}].", + tmpFile, + getOutputDir().get().resolve(filename, RESOLVE_FILE) + ); + FileSystems.delete( + Collections.singletonList(tmpFile), + MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES + ); + throw e; + } + } + + @Teardown + public void teardown() throws IOException { + try { + FileSystems.rename( + Collections.singletonList(tmpDir.get()), + Collections.singletonList(getOutputDir().get()) + ); + } catch (IOException e) { + LOG.error( + "Failed to rename temporary directory [{}] to [{}].", + tmpDir.get(), getOutputDir().get() + ); + throw e; } } } @@ -190,6 +232,28 @@ public void processElement(ProcessContext c) throws IOException { return PDone.in(input.getPipeline()); } + + private static class TemporaryDirectoryBuilder + implements SerializableFunction { + private static final AtomicLong TEMP_COUNT = new AtomicLong(0); + private static final DateTimeFormatter TEMPDIR_TIMESTAMP = + DateTimeFormat.forPattern("yyyy-MM-DD_HH-mm-ss"); + // The intent of the code is to have a consistent value of tempDirectory across + // all workers, which wouldn't happen if now() was called inline. + private final String timestamp = Instant.now().toString(TEMPDIR_TIMESTAMP); + // Multiple different sinks may be used in the same output directory; use tempId to create a + // separate temp directory for each. + private final Long tempId = TEMP_COUNT.getAndIncrement(); + + @Override + public ResourceId apply(ResourceId tempDirectory) { + // Temp directory has a timestamp and a unique ID + String tempDirName = String.format(".temp-beam-%s-%s", timestamp, tempId); + return tempDirectory + .getCurrentDirectory() + .resolve(tempDirName, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY); + } + } } /** Disable construction of utility class. */ From 6ae3a683bf9586865d06364c0cfccfe7057ded32 Mon Sep 17 00:00:00 2001 From: Chris Hebert Date: Wed, 16 Aug 2017 15:24:45 -0500 Subject: [PATCH 06/10] [BEAM-2750][BEAM-2751] Add comments to WholeFileIO --- .../org/apache/beam/sdk/io/WholeFileIO.java | 61 ++++++++++++++++++- 1 file changed, 58 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java index 1e1a12541896..7318a672950c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java @@ -54,19 +54,74 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -// TODO: Write comment on what this is and how to use it. -// TODO: Mention that if multiple files have the same filename at write, only one will survive. /** - * WholeFileIO. + * {@link PTransform}s for reading and writing files as {@link KV} pairs of filename {@link String}s + * and byte arrays. + * + *

To read a {@link PCollection} of one or more files as {@link KV}s, use + * {@code WholeFileIO.read()} to instantiate a transform and use + * {@link WholeFileIO.Read#from(String)} to specify the path of the file(s) to be read.

+ * + *

Method {@link #read} returns a {@link PCollection} of {@code KV}s, + * each corresponding to one file's filename and contents.

+ * + *

The filepatterns are expanded only once. + * + *

Example 1: reading a file or filepattern (or file glob). + * + *

{@code
+ * Pipeline p = ...;
+ *
+ * // A Read of a local file (only runs locally):
+ * PCollection> oneFile = p.apply(
+ *                                              WholeFileIO.read().from("/local/path/to/file.txt"));
+ *
+ * // A Read of local files in a directory (only runs locally):
+ * PCollection> manyFiles = p.apply(
+ *                                               WholeFileIO.read().from("/local/path/to/files/*"));
+ *
+ * // A Read of local files in nested directories (only runs locally):
+ * PCollection> manyFiles = p.apply(
+ *                                        WholeFileIO.read().from("/local/path/to/nested/dirs/**"));
+ * // ^ The KV's String corresponding to filename retains only the last term of the file path
+ * //   (i.e. it retains the filename and ignores intermediate directory names)
+ * }
+ * + *

To write the byte array of a {@link PCollection} of {@code KV} to an output + * directory with the KV's String as filename, use {@code WholeFileIO.write()} with + * {@link WholeFileIO.Write#to(String)} to specify the output directory of the files to write. + * + *

For example: + * + *

{@code
+ * // A simple Write to a local file (only runs locally):
+ * PCollection> files = ...;
+ * lines.apply(WholeFileIO.write().to("/path/to/output/dir/"));
+ * }
+ * + *

Any existing files with the same names as generated output files will be overwritten. + * Similarly, if multiple KV's in the incoming {@link PCollection} have the same String (i.e. + * filename), then duplicates will be overwritten by the other such named elements of the + * {@link PCollection}. In other words, only one {@link KV} of a certain filename will write out + * successfully. */ public class WholeFileIO { + /** + * A {@link PTransform} that reads from one or more files and returns a bounded + * {@link PCollection} containing one {@link KV} element for each input file. + */ public static Read read() { return new AutoValue_WholeFileIO_Read.Builder().build(); } // TODO: Add a readAll() like TextIO. + /** + * A {@link PTransform} that takes a {@link PCollection} of {@link KV {@code KV}} + * and writes each {@code byte[]} to the corresponding filename (i.e. the {@link String} of the + * {@link KV}). + */ public static Write write() { return new AutoValue_WholeFileIO_Write.Builder().build(); } From 97dc61bb0919c7cf8f001d1c562adcb0916e10f2 Mon Sep 17 00:00:00 2001 From: Chris Hebert Date: Wed, 16 Aug 2017 16:05:12 -0500 Subject: [PATCH 07/10] [BEAM-2750][BEAM-2751] Add readAll() to WholeFileIO --- .../org/apache/beam/sdk/io/WholeFileIO.java | 60 ++++++++++++------- 1 file changed, 37 insertions(+), 23 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java index 7318a672950c..e684736e3bc2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java @@ -115,7 +115,9 @@ public static Read read() { return new AutoValue_WholeFileIO_Read.Builder().build(); } - // TODO: Add a readAll() like TextIO. + public static ReadAll readAll() { + return new AutoValue_WholeFileIO_ReadAll.Builder().build(); + } /** * A {@link PTransform} that takes a {@link PCollection} of {@link KV {@code KV}} @@ -159,32 +161,44 @@ public PCollection> expand(PBegin input) { getFilePattern(), "Need to set the filePattern of a WholeFileIO.Read transform." ); + WholeFileIO.ReadAll readAll = readAll(); + return input.apply(Create.ofProvider(getFilePattern(), StringUtf8Coder.of())) + .apply("Via ReadAll", readAll); + } + } - PCollection filePatternPCollection = input.apply( - Create.ofProvider(getFilePattern(), StringUtf8Coder.of())); - - PCollection matchResultMetaData = filePatternPCollection.apply( - Match.filepatterns()); + /** Implementation of {@link #readAll()}. */ + @AutoValue + public abstract static class ReadAll extends PTransform, + PCollection>> { + abstract Builder toBuilder(); - PCollection> files = matchResultMetaData.apply( - ParDo.of( - new DoFn>() { - @ProcessElement - public void processElement(ProcessContext c) throws IOException { - MatchResult.Metadata metadata = c.element(); - ResourceId resourceId = metadata.resourceId(); + @AutoValue.Builder + abstract static class Builder { + abstract ReadAll build(); + } - try ( - InputStream inStream = Channels.newInputStream(FileSystems.open(resourceId)) - ) { - c.output(KV.of(resourceId.getFilename(), StreamUtils.getBytes(inStream))); - } - } - } - ) - ); + @Override + public PCollection> expand(PCollection input) { + return input.apply(Match.filepatterns()) + .apply(ParDo.of(new ReadWholeFileFn())); + } - return files; + /** + * A {@link DoFn} that takes in {@link MatchResult.Metadata} from which it acquires a + * {@link ResourceId} that it reads into a {@code byte[]} paired in a {@link KV} with the + * filename as a {@link String}. + */ + public static class ReadWholeFileFn extends DoFn> { + @ProcessElement + public void processElement(ProcessContext c) throws IOException { + MatchResult.Metadata metadata = c.element(); + ResourceId resourceId = metadata.resourceId(); + + try (InputStream inStream = Channels.newInputStream(FileSystems.open(resourceId))) { + c.output(KV.of(resourceId.getFilename(), StreamUtils.getBytes(inStream))); + } + } } } From 21b142d27f0cd5b70dd700ebb47c320b9704f51a Mon Sep 17 00:00:00 2001 From: Chris Hebert Date: Wed, 16 Aug 2017 16:12:27 -0500 Subject: [PATCH 08/10] [BEAM-2750][BEAM-2751] Add readAll() comments in WholeFileIO --- .../org/apache/beam/sdk/io/WholeFileIO.java | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java index e684736e3bc2..a3c5df8d5e9c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java @@ -60,7 +60,9 @@ * *

To read a {@link PCollection} of one or more files as {@link KV}s, use * {@code WholeFileIO.read()} to instantiate a transform and use - * {@link WholeFileIO.Read#from(String)} to specify the path of the file(s) to be read.

+ * {@link WholeFileIO.Read#from(String)} to specify the path of the file(s) to be read. + * Alternatively, if the filenames to be read are themselves in a {@link + * PCollection}, apply {@link WholeFileIO#readAll()}. * *

Method {@link #read} returns a {@link PCollection} of {@code KV}s, * each corresponding to one file's filename and contents.

@@ -87,6 +89,19 @@ * // (i.e. it retains the filename and ignores intermediate directory names) * } * + *

Example 2: reading a PCollection of filenames (whole paths, not just the last term). + * + *

{@code
+ * Pipeline p = ...;
+ *
+ * // E.g. the filenames might be computed from other data in the pipeline, or
+ * // read from a data source.
+ * PCollection filenames = ...;
+ *
+ * // Read all files in the collection.
+ * PCollection> files = filenames.apply(WholeFileIO.readAll());
+ * }
+ * *

To write the byte array of a {@link PCollection} of {@code KV} to an output * directory with the KV's String as filename, use {@code WholeFileIO.write()} with * {@link WholeFileIO.Write#to(String)} to specify the output directory of the files to write. @@ -128,9 +143,7 @@ public static Write write() { return new AutoValue_WholeFileIO_Write.Builder().build(); } - /** - * Implements read(). - */ + /** Implementation of {@link #read()}. */ @AutoValue public abstract static class Read extends PTransform>> { @Nullable @@ -202,9 +215,7 @@ public void processElement(ProcessContext c) throws IOException { } } - /** - * Implements write(). - */ + /** Implementation of {@Link #write()}. */ @AutoValue public abstract static class Write extends PTransform>, PDone> { private static final Logger LOG = LoggerFactory.getLogger(Write.class); From cc8ab27371dda45270c1d1e6ac9047a7c53db801 Mon Sep 17 00:00:00 2001 From: Chris Hebert Date: Wed, 16 Aug 2017 16:42:22 -0500 Subject: [PATCH 09/10] [BEAM-2750][BEAM-2751] Fix WholeFileIO bug WholeFileIO.Write failed in teardown() because FileSystems.rename(src, dst) fails when src and dst are directories and dst is not empty. --- .../org/apache/beam/sdk/io/WholeFileIO.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java index a3c5df8d5e9c..7f7ecc7465ed 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY; import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE; import static org.apache.beam.sdk.util.MimeTypes.BINARY; @@ -114,11 +115,12 @@ * lines.apply(WholeFileIO.write().to("/path/to/output/dir/")); * } * - *

Any existing files with the same names as generated output files will be overwritten. - * Similarly, if multiple KV's in the incoming {@link PCollection} have the same String (i.e. - * filename), then duplicates will be overwritten by the other such named elements of the - * {@link PCollection}. In other words, only one {@link KV} of a certain filename will write out - * successfully. + *

Warnings: 1) Specified output directory may either not yet exist or be empty, but may not have + * contents prior to {@link Write}. 2) Any existing files with the same names as generated output + * files will be overwritten. 3) Similarly, if multiple KV's in the incoming {@link PCollection} + * have the same String (i.e. filename), then duplicates will be overwritten by the other such named + * elements of the {@link PCollection}. In other words, only one {@link KV} of a certain filename + * will write out successfully. */ public class WholeFileIO { @@ -294,6 +296,8 @@ public void processElement(ProcessContext c) throws IOException { @Teardown public void teardown() throws IOException { try { + // TODO: Replace FileSystems.rename() with a copy and delete to eliminate + // empty directory restriction on Write. FileSystems.rename( Collections.singletonList(tmpDir.get()), Collections.singletonList(getOutputDir().get()) @@ -313,6 +317,8 @@ public void teardown() throws IOException { return PDone.in(input.getPipeline()); } + // TODO: Either fully customize this tool for WholeFileIO or change + // FileBasedSink.WriteOperation.TemporaryDirectoryBuilder from private to public (or protected). private static class TemporaryDirectoryBuilder implements SerializableFunction { private static final AtomicLong TEMP_COUNT = new AtomicLong(0); @@ -330,7 +336,7 @@ public ResourceId apply(ResourceId tempDirectory) { // Temp directory has a timestamp and a unique ID String tempDirName = String.format(".temp-beam-%s-%s", timestamp, tempId); return tempDirectory - .getCurrentDirectory() + .resolve("..", RESOLVE_DIRECTORY) // getParent() .resolve(tempDirName, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY); } } From b393eae9325eec5a0a45b595c660f06296bc98c6 Mon Sep 17 00:00:00 2001 From: Chris Hebert Date: Wed, 16 Aug 2017 17:02:40 -0500 Subject: [PATCH 10/10] [BEAM-2750][BEAM-2751] Fix WholeFileIO comments --- .../src/main/java/org/apache/beam/sdk/io/WholeFileIO.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java index 7f7ecc7465ed..8560e052fe7d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WholeFileIO.java @@ -137,7 +137,7 @@ public static ReadAll readAll() { } /** - * A {@link PTransform} that takes a {@link PCollection} of {@link KV {@code KV}} + * A {@link PTransform} that takes a {@link PCollection} of {@code KV} * and writes each {@code byte[]} to the corresponding filename (i.e. the {@link String} of the * {@link KV}). */ @@ -217,7 +217,7 @@ public void processElement(ProcessContext c) throws IOException { } } - /** Implementation of {@Link #write()}. */ + /** Implementation of {@link #write()}. */ @AutoValue public abstract static class Write extends PTransform>, PDone> { private static final Logger LOG = LoggerFactory.getLogger(Write.class);