Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-240] Display data links for input and output files #300

Closed
wants to merge 12 commits into from
43 changes: 39 additions & 4 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
Expand Up @@ -27,6 +27,7 @@
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -330,9 +331,12 @@ public PCollection<T> apply(PInput input) {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.addIfNotNull(DisplayData.item("filePattern", filepattern))
.addIfNotDefault(DisplayData.item("validation", validate), true);
if (filepattern != null) {
builder.add(DisplayData.item("filePattern", filepattern)
.withLinkUrl(getBrowseUrl(filepattern, validate)));
}

builder.addIfNotDefault(DisplayData.item("validation", validate), true);
}

@Override
Expand Down Expand Up @@ -693,12 +697,13 @@ public PDone apply(PCollection<T> input) {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

builder
.add(DisplayData.item("schema", type))
.addIfNotNull(DisplayData.item("filePrefix", filenamePrefix))
.addIfNotDefault(
DisplayData.item("shardNameTemplate", shardTemplate),
DEFAULT_SHARD_TEMPLATE)
.addIfNotNull(DisplayData.item("filePrefix", filenamePrefix))
.addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix), "")
.addIfNotDefault(DisplayData.item("numShards", numShards), 0)
.addIfNotDefault(DisplayData.item("validation", validate), true);
Expand Down Expand Up @@ -760,6 +765,36 @@ private static void validateOutputComponent(String partialFilePattern) {
+ partialFilePattern);
}

/**
* Retrieve the browse URL for a file pattern.
*
* @param validate Whether validation errors should cause an exception to throw.
* @return The browse URL, or null if the filePattern is invalid and validation is disabled.
*/
@Nullable
private static String getBrowseUrl(String filePattern, boolean validate) {
if (!IOChannelUtils.hasFactory(filePattern)) {
// Browse URLs are only used for display data and shouldn't throw unless validation is
// enabled.
if (validate) {
throw new IllegalStateException(String.format("Invalid filePattern: %s", filePattern));
} else {
return null;
}
}

IOChannelFactory factory;
try {
factory = IOChannelUtils.getFactory(filePattern);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My impression of the current code is that the IOChannelFactory is never interrogated unless validate is true. This should probably stick to that discipline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated AvroIO and TextIO to be resilient to bad file schemes when their validation is disabled. I didn't update sources and sinks, but now I think they probably need the same treatment. I'll work on that now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

} catch (IOException e) {
// hasFactory checked above, should not throw
throw new AssertionError(String.format(
"Unexpected error while retrieving browse url for file pattern: %s", filePattern), e);
Copy link
Member

@tgroh tgroh May 12, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this sequence of calls is guaranteed to never throw, we should consider removing the checked exception and throwing an IllegalArgumentException inside IOChannelUtils if called improperly; If it can change from under you (which it probably shouldn't be able to), then this probably more of a ConcurrentModificationException or the like.

You don't need to make this change now, but maybe file a JIRA issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. I've opened BEAM-281 to track.

}

return factory.getBrowseUrl(filePattern);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move into the try block

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason? I prefer to keep try blocks tight so it's clear which operation can throw and to not catch more than expected.

}

/////////////////////////////////////////////////////////////////////////////

/** Disallow construction of utility class. */
Expand Down
Expand Up @@ -140,9 +140,33 @@ public void validate(PipelineOptions options) {}
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

String fileNamePattern = String.format("%s%s%s",
baseOutputFilename, fileNamingTemplate, getFileExtension(extension));
builder.add(DisplayData.item("fileNamePattern", fileNamePattern));
String browseUrl = null;
String browseFilePattern = formatFilePattern(
baseOutputFilename, globTemplate(fileNamingTemplate), extension);
if (IOChannelUtils.hasFactory(browseFilePattern)) {
IOChannelFactory factory;
try {
factory = IOChannelUtils.getFactory(browseFilePattern);
} catch (IOException e) {
throw new AssertionError(String.format(
"Unexpected error while retrieving browse url for file pattern: %s", browseFilePattern),
e);
}

browseUrl = factory.getBrowseUrl(browseFilePattern);
}

String fileNamePattern = formatFilePattern(baseOutputFilename, fileNamingTemplate, extension);
builder.add(DisplayData.item("fileNamePattern", fileNamePattern)
.withLinkUrl(browseUrl));
}

private static String formatFilePattern(String base, String template, String extension) {
return String.format("%s%s%s", base, template, getFileExtension(extension));
}

private static String globTemplate(String fileNamingTemplate) {
return fileNamingTemplate.replace('N', '*').replace('S', '*');
}

/**
Expand Down
Expand Up @@ -277,7 +277,22 @@ private static long getEstimatedSizeOfFilesBySampling(
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.add(DisplayData.item("filePattern", getFileOrPatternSpec()));


String fileOrPatternSpec = getFileOrPatternSpec();
String browseUrl = null;
if (IOChannelUtils.hasFactory(fileOrPatternSpec)) {
try {
browseUrl = IOChannelUtils.getFactory(fileOrPatternSpec).getBrowseUrl(fileOrPatternSpec);
} catch (IOException e) {
throw new AssertionError(String.format(
"Unexpected error while retrieving browse url for file pattern: %s", fileOrPatternSpec),
e);
}
}

builder.add(DisplayData.item("filePattern", fileOrPatternSpec)
.withLinkUrl(browseUrl));
}

private ListenableFuture<List<? extends FileBasedSource<T>>> createFutureForFileSplit(
Expand Down
38 changes: 36 additions & 2 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
Expand Up @@ -28,6 +28,7 @@
import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -343,10 +344,14 @@ public PCollection<T> apply(PInput input) {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

if (filepattern != null) {
builder.add(DisplayData.item("filePattern", filepattern)
.withLinkUrl(getBrowseUrl(filepattern, validate)));
}

builder
.add(DisplayData.item("compressionType", compressionType.toString()))
.addIfNotDefault(DisplayData.item("validation", validate), true)
.addIfNotNull(DisplayData.item("filePattern", filepattern));
.addIfNotDefault(DisplayData.item("validation", validate), true);
}

@Override
Expand Down Expand Up @@ -744,6 +749,35 @@ private static void validateOutputComponent(String partialFilePattern) {
+ partialFilePattern);
}

/**
* Retrieve the browse URL for a file pattern.
*
* @param validate Whether validation errors should cause an exception to throw.
* @return The browse URL, or null if the filePattern is invalid and validation is disabled.
*/
private static String getBrowseUrl(String filePattern, boolean validate) {
if (!IOChannelUtils.hasFactory(filePattern)) {
// Browse URLs are only used for display data and shouldn't throw unless validation is
// enabled.
if (validate) {
throw new IllegalStateException(String.format("Invalid filePattern: %s", filePattern));
} else {
return null;
}
}

IOChannelFactory factory;
try {
factory = IOChannelUtils.getFactory(filePattern);
} catch (IOException e) {
// hasFactory checked above, should not throw
throw new AssertionError(String.format(
"Unexpected error while retrieving browse url for file pattern: %s", filePattern), e);
}

return factory.getBrowseUrl(filePattern);
}

//////////////////////////////////////////////////////////////////////////////

/** Disable construction of utility class. */
Expand Down
Expand Up @@ -43,6 +43,8 @@
import java.util.List;
import java.util.regex.Matcher;

import javax.annotation.Nullable;

/**
* Implements IOChannelFactory for local files.
*/
Expand Down Expand Up @@ -133,4 +135,10 @@ public boolean isReadSeekEfficient(String spec) throws IOException {
public String resolve(String path, String other) throws IOException {
return Paths.get(path).resolve(other).toString();
}
@Nullable
@Override
public String getBrowseUrl(String path) {
File file = new File(path);
return file.toURI().toString();
}
}
Expand Up @@ -27,6 +27,8 @@
import java.util.LinkedList;
import java.util.List;

import javax.annotation.Nullable;

/**
* Implements IOChannelFactory for GCS.
*/
Expand Down Expand Up @@ -84,4 +86,9 @@ public boolean isReadSeekEfficient(String spec) throws IOException {
public String resolve(String path, String other) throws IOException {
return GcsPath.fromUri(path).resolve(other).toString();
}
@Nullable
@Override
public String getBrowseUrl(String path) {
return GcsPath.fromUri(path).getBrowseUrl();
}
}
Expand Up @@ -23,6 +23,8 @@
import java.nio.channels.WritableByteChannel;
import java.util.Collection;

import javax.annotation.Nullable;

/**
* Defines a factory for working with read and write channels.
*
Expand Down Expand Up @@ -98,5 +100,21 @@ public interface IOChannelFactory {
* Where the {@code other} path has a root component then resolution is highly implementation
* dependent and therefore unspecified.
*/
public String resolve(String path, String other) throws IOException;
String resolve(String path, String other) throws IOException;

/**
* Retrieve a URL where the given {@code path} can be viewed and browsed, or null if browse URLs
* are not supported.
*
* <p>The returned URL should be suitable for a user to enter into a web browser and browse
* interactively. If the {@code path} refers to a file or data resource, the URL should
* point to a location where the resource can be viewed. If the {@code path} points to a
* directory or contains wildcards, the URL should point to a location where the inner resources
* can be browsed.
*
* <p>This method does not validate that a resource exists or is accessible for the given
* {@code path}.
*/
@Nullable
String getBrowseUrl(String path);
}
Expand Up @@ -37,6 +37,9 @@
* Provides utilities for creating read and write channels.
*/
public class IOChannelUtils {
// Prevent instances
private IOChannelUtils() {}

// TODO: add registration mechanism for adding new schemas.
private static final Map<String, IOChannelFactory> FACTORY_MAP =
Collections.synchronizedMap(new HashMap<String, IOChannelFactory>());
Expand Down Expand Up @@ -160,13 +163,33 @@ public static String constructName(String prefix,
return sb.toString();
}

/**
* Query whether an {@link IOChannelFactory} is associated with an input specification.
*
* <p>To register a new {@link IOChannelFactory}, call {@link #setIOFactory}.
*/
public static boolean hasFactory(String spec) {
return tryGetFactory(spec) != null;
}

private static final Pattern URI_SCHEME_PATTERN = Pattern.compile(
"(?<scheme>[a-zA-Z][-a-zA-Z0-9+.]*)://.*");

/**
* Returns the IOChannelFactory associated with an input specification.
* Returns the {@link IOChannelFactory} associated with an input specification.
*
* @throws IOException if no {@link IOChannelFactory} is registered to handle the given file spec.
*/
public static IOChannelFactory getFactory(String spec) throws IOException {
IOChannelFactory ioFactory = tryGetFactory(spec);
if (ioFactory != null) {
return ioFactory;
}

throw new IOException("Unable to find handler for " + spec);
}

private static IOChannelFactory tryGetFactory(String spec) {
// The spec is almost, but not quite, a URI. In particular,
// the reserved characters '[', ']', and '?' have meanings that differ
// from their use in the URI spec. ('*' is not reserved).
Expand All @@ -179,12 +202,7 @@ public static IOChannelFactory getFactory(String spec) throws IOException {
}

String scheme = matcher.group("scheme");
IOChannelFactory ioFactory = FACTORY_MAP.get(scheme);
if (ioFactory != null) {
return ioFactory;
}

throw new IOException("Unable to find handler for " + spec);
return FACTORY_MAP.get(scheme);
}

/**
Expand Down
Expand Up @@ -478,7 +478,6 @@ public WatchKey register(WatchService watcher, WatchEvent.Kind<?>... events)
public Iterator<Path> iterator() {
return new NameIterator(fs, !bucket.isEmpty(), bucketAndObject());
}

private static class NameIterator implements Iterator<Path> {
private final FileSystem fs;
private boolean fullPath;
Expand Down Expand Up @@ -601,6 +600,40 @@ public String toResourceName() {
return sb.toString();
}

private static final Pattern HAS_GLOB = Pattern.compile(".*[\\*\\?\\[\\]].*");
/**
* @inheritDoc
*
* @see <a href="https://cloud.google.com/storage/docs/cloud-console#_accessing">
* Accessing Google Cloud Platform Console</a>
*/
public String getBrowseUrl() {
// GCS uses different URLs for browsing buckets vs. objects. Object "subdirectories" can
// be treated as buckets for browsing.
final String bucketPrefix = "https://console.cloud.google.com/storage/browser/";
final String objectPrefix = "https://storage.cloud.google.com/";

// Iterate through path to remove any glob pattern suffix.
StringBuilder resourceBuilder = new StringBuilder(bucket).append("/");
String[] components = object.split("/");
for (int i = 0; i < components.length; i++) {
String component = components[i];
if (HAS_GLOB.matcher(component).matches()) {
break;
}

resourceBuilder.append(component);
if (i + 1 < components.length || object.endsWith("/")) {
resourceBuilder.append("/");
}
}

final boolean isDirectoryPath = resourceBuilder.charAt(resourceBuilder.length() - 1) == '/';
String prefix = isDirectoryPath ? bucketPrefix : objectPrefix;
resourceBuilder.insert(0, prefix);
return resourceBuilder.toString();
}

@Override
public URI toUri() {
try {
Expand Down