Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-240] Display data links for input and output files #300

Closed
wants to merge 12 commits into from
31 changes: 27 additions & 4 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
Expand Up @@ -27,6 +27,7 @@
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -330,9 +331,12 @@ public PCollection<T> apply(PInput input) {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.addIfNotNull(DisplayData.item("filePattern", filepattern))
.addIfNotDefault(DisplayData.item("validation", validate), true);
if (filepattern != null) {
builder.add(DisplayData.item("filePattern", filepattern)
.withLinkUrl(getBrowseUrl(filepattern)));
}

builder.addIfNotDefault(DisplayData.item("validation", validate), true);
}

@Override
Expand Down Expand Up @@ -693,9 +697,16 @@ public PDone apply(PCollection<T> input) {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
if (filenamePrefix != null) {
// Append wildcard to browseUrl input since this is a prefix
// for shardNameTemplate + fileSuffix.
String browseUrl = getBrowseUrl(filenamePrefix + "*");
builder.add(DisplayData.item("filePrefix", filenamePrefix)
.withLinkUrl(browseUrl));
}

builder
.add(DisplayData.item("schema", type))
.addIfNotNull(DisplayData.item("filePrefix", filenamePrefix))
.addIfNotDefault(
DisplayData.item("shardNameTemplate", shardTemplate),
DEFAULT_SHARD_TEMPLATE)
Expand Down Expand Up @@ -760,6 +771,18 @@ private static void validateOutputComponent(String partialFilePattern) {
+ partialFilePattern);
}

private static String getBrowseUrl(String filePattern) {
IOChannelFactory factory;
try {
factory = IOChannelUtils.getFactory(filePattern);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My impression of the current code is that the IOChannelFactory is never interrogated unless validate is true. This should probably stick to that discipline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated AvroIO and TextIO to be resilient to bad file schemes when their validation is disabled. I didn't update sources and sinks, but now I think they probably need the same treatment. I'll work on that now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

} catch (IOException e) {
throw new IllegalStateException(
String.format("Invalid filePattern: %s", filePattern), e);
}

return factory.getBrowseUrl(filePattern);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move into the try block

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason? I prefer to keep try blocks tight so it's clear which operation can throw and to not catch more than expected.

}

/////////////////////////////////////////////////////////////////////////////

/** Disallow construction of utility class. */
Expand Down
Expand Up @@ -140,9 +140,22 @@ public void validate(PipelineOptions options) {}
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

IOChannelFactory factory;
try {
factory = IOChannelUtils.getFactory(baseOutputFilename);
} catch (IOException e) {
throw new IllegalStateException(
String.format("Unrecognized file name format: %s", baseOutputFilename), e);
}

// Append wildcard to browseUrl input since this is a filename prefix
String browseUrl = factory.getBrowseUrl(baseOutputFilename + "*");

String fileNamePattern = String.format("%s%s%s",
baseOutputFilename, fileNamingTemplate, getFileExtension(extension));
builder.add(DisplayData.item("fileNamePattern", fileNamePattern));

builder.add(DisplayData.item("fileNamePattern", fileNamePattern)
.withLinkUrl(browseUrl));
}

/**
Expand Down
Expand Up @@ -277,7 +277,14 @@ private static long getEstimatedSizeOfFilesBySampling(
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.add(DisplayData.item("filePattern", getFileOrPatternSpec()));
String fileOrPatternSpec = getFileOrPatternSpec();
try {
builder.add(DisplayData.item("filePattern", fileOrPatternSpec)
.withLinkUrl(IOChannelUtils.getFactory(fileOrPatternSpec).getBrowseUrl(fileOrPatternSpec)));
} catch (IOException e) {
throw new IllegalStateException(
String.format("Invalid file pattern: %s", fileOrPatternSpec), e);
}
}

private ListenableFuture<List<? extends FileBasedSource<T>>> createFutureForFileSplit(
Expand Down
30 changes: 27 additions & 3 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
Expand Up @@ -28,6 +28,7 @@
import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -343,10 +344,14 @@ public PCollection<T> apply(PInput input) {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

if (filepattern != null) {
builder.add(DisplayData.item("filePattern", filepattern)
.withLinkUrl(getBrowseUrl(filepattern)));
}

builder
.add(DisplayData.item("compressionType", compressionType.toString()))
.addIfNotDefault(DisplayData.item("validation", validate), true)
.addIfNotNull(DisplayData.item("filePattern", filepattern));
.addIfNotDefault(DisplayData.item("validation", validate), true);
}

@Override
Expand Down Expand Up @@ -648,8 +653,15 @@ public PDone apply(PCollection<T> input) {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

if (filenamePrefix != null) {
// Append wildcard to browseUrl input since this is a filename prefix
String browseUrl = getBrowseUrl(filenamePrefix + "*");

builder.add(DisplayData.item("filePrefix", filenamePrefix)
.withLinkUrl(browseUrl));
}

builder
.addIfNotNull(DisplayData.item("filePrefix", filenamePrefix))
.addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix), "")
.addIfNotDefault(
DisplayData.item("shardNameTemplate", shardTemplate),
Expand Down Expand Up @@ -744,6 +756,18 @@ private static void validateOutputComponent(String partialFilePattern) {
+ partialFilePattern);
}

private static String getBrowseUrl(String filePattern) {
IOChannelFactory factory;
try {
factory = IOChannelUtils.getFactory(filePattern);
} catch (IOException e) {
throw new IllegalStateException(
String.format("Invalid filePattern: %s", filePattern), e);
}

return factory.getBrowseUrl(filePattern);
}

//////////////////////////////////////////////////////////////////////////////

/** Disable construction of utility class. */
Expand Down
Expand Up @@ -43,6 +43,8 @@
import java.util.List;
import java.util.regex.Matcher;

import javax.annotation.Nullable;

/**
* Implements IOChannelFactory for local files.
*/
Expand Down Expand Up @@ -133,4 +135,10 @@ public boolean isReadSeekEfficient(String spec) throws IOException {
public String resolve(String path, String other) throws IOException {
return Paths.get(path).resolve(other).toString();
}
@Nullable
@Override
public String getBrowseUrl(String path) {
File file = new File(path);
return file.toURI().toString();
}
}
Expand Up @@ -27,6 +27,8 @@
import java.util.LinkedList;
import java.util.List;

import javax.annotation.Nullable;

/**
* Implements IOChannelFactory for GCS.
*/
Expand Down Expand Up @@ -84,4 +86,9 @@ public boolean isReadSeekEfficient(String spec) throws IOException {
public String resolve(String path, String other) throws IOException {
return GcsPath.fromUri(path).resolve(other).toString();
}
@Nullable
@Override
public String getBrowseUrl(String path) {
return GcsPath.fromUri(path).getBrowseUrl();
}
}
Expand Up @@ -23,6 +23,8 @@
import java.nio.channels.WritableByteChannel;
import java.util.Collection;

import javax.annotation.Nullable;

/**
* Defines a factory for working with read and write channels.
*
Expand Down Expand Up @@ -98,5 +100,21 @@ public interface IOChannelFactory {
* Where the {@code other} path has a root component then resolution is highly implementation
* dependent and therefore unspecified.
*/
public String resolve(String path, String other) throws IOException;
String resolve(String path, String other) throws IOException;

/**
* Retrieve a URL where the given {@code path} can be viewed and browsed, or null if browse URLs
* are not supported.
*
* <p>The returned URL should be suitable for a user to enter into a web browser and browse
* interactively. If the {@code path} refers to a file or data resource, the URL should
* point to a location where the resource can be viewed. If the {@code path} points to a
* directory or contains wildcards, the URL should point to a location where the inner resources
* can be browsed.
*
* <p>This method does not validate that a resource exists or is accessible for the given
* {@code path}.
*/
@Nullable
String getBrowseUrl(String path);
}
Expand Up @@ -478,7 +478,6 @@ public WatchKey register(WatchService watcher, WatchEvent.Kind<?>... events)
public Iterator<Path> iterator() {
return new NameIterator(fs, !bucket.isEmpty(), bucketAndObject());
}

private static class NameIterator implements Iterator<Path> {
private final FileSystem fs;
private boolean fullPath;
Expand Down Expand Up @@ -601,6 +600,40 @@ public String toResourceName() {
return sb.toString();
}

private static final Pattern HAS_GLOB = Pattern.compile(".*[\\*\\?\\[\\]].*");
/**
* @inheritDoc
*
* @see <a href="https://cloud.google.com/storage/docs/cloud-console#_accessing">
* Accessing Google Cloud Platform Console</a>
*/
public String getBrowseUrl() {
// GCS uses different URLs for browsing buckets vs. objects. Object "subdirectories" can
// be treated as buckets for browsing.
final String bucketPrefix = "https://console.cloud.google.com/storage/browser/";
final String objectPrefix = "https://storage.cloud.google.com/";

// Iterate through path to remove any glob pattern suffix.
StringBuilder resourceBuilder = new StringBuilder(bucket).append("/");
String[] components = object.split("/");
for (int i = 0; i < components.length; i++) {
String component = components[i];
if (HAS_GLOB.matcher(component).matches()) {
break;
}

resourceBuilder.append(component);
if (i + 1 < components.length || object.endsWith("/")) {
resourceBuilder.append("/");
}
}

final boolean isDirectoryPath = resourceBuilder.charAt(resourceBuilder.length() - 1) == '/';
String prefix = isDirectoryPath ? bucketPrefix : objectPrefix;
resourceBuilder.insert(0, prefix);
return resourceBuilder.toString();
}

@Override
public URI toUri() {
try {
Expand Down
Expand Up @@ -18,7 +18,11 @@
package org.apache.beam.sdk.io;

import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasLinkUrl;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand All @@ -28,11 +32,14 @@
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.io.AvroIO.Write.Bound;
import org.apache.beam.sdk.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.values.PCollection;

import com.google.common.base.MoreObjects;
Expand All @@ -42,6 +49,7 @@
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.Nullable;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
Expand All @@ -62,6 +70,11 @@ public class AvroIOTest {
@Rule
public TemporaryFolder tmpFolder = new TemporaryFolder();

@BeforeClass
public static void setUpClass() {
IOChannelUtils.registerStandardIOFactories(PipelineOptionsFactory.as(GcsOptions.class));
}

@Test
public void testReadWithoutValidationFlag() throws Exception {
AvroIO.Read.Bound<GenericRecord> read = AvroIO.Read.from("gs://bucket/foo*/baz");
Expand Down Expand Up @@ -262,18 +275,24 @@ public void testAvroSinkShardedWrite() throws Exception {

@Test
public void testReadDisplayData() {
AvroIO.Read.Bound<?> read = AvroIO.Read.from("foo.*")
String filePattern = "gs://bucket/foo/bar";
AvroIO.Read.Bound<?> read = AvroIO.Read.from(filePattern)
.withoutValidation();

DisplayData displayData = DisplayData.from(read);
assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));
assertThat(displayData, hasDisplayItem("validation", false));
assertThat(displayData, hasDisplayItem(allOf(
hasKey("filePattern"),
hasValue(filePattern),
hasLinkUrl(GcsPath.fromUri(filePattern).getBrowseUrl()))));
}

@Test
public void testWriteDisplayData() {
String filePattern = "gs://bucket/foo/bar";

AvroIO.Write.Bound<?> write = AvroIO.Write
.to("foo")
.to(filePattern)
.withShardNameTemplate("-SS-of-NN-")
.withSuffix("bar")
.withSchema(GenericClass.class)
Expand All @@ -282,7 +301,11 @@ public void testWriteDisplayData() {

DisplayData displayData = DisplayData.from(write);

assertThat(displayData, hasDisplayItem("filePrefix", "foo"));
assertThat(displayData, hasDisplayItem(allOf(
hasKey("filePrefix"),
hasValue(filePattern),
hasLinkUrl(GcsPath.fromUri(filePattern + "*").getBrowseUrl()))));

assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-"));
assertThat(displayData, hasDisplayItem("fileSuffix", "bar"));
assertThat(displayData, hasDisplayItem("schema", GenericClass.class));
Expand Down