Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions sdk/src/main/java/com/google/cloud/dataflow/sdk/io/AvroIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,10 @@ public PCollection<T> apply(PInput input) {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.addIfNotNull("filePattern", filepattern)
.addIfNotDefault("validation", validate, true);
.addIfNotNull(DisplayData.item("filePattern", filepattern)
.withLabel("Input File Pattern"))
.addIfNotDefault(DisplayData.item("validation", validate)
.withLabel("Validation Enabled"), true);
}

@Override
Expand Down Expand Up @@ -692,12 +694,22 @@ public PDone apply(PCollection<T> input) {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.add("schema", type)
.addIfNotNull("filePrefix", filenamePrefix)
.addIfNotDefault("shardNameTemplate", shardTemplate, DEFAULT_SHARD_TEMPLATE)
.addIfNotDefault("fileSuffix", filenameSuffix, "")
.addIfNotDefault("numShards", numShards, 0)
.addIfNotDefault("validation", validate, true);
.add(DisplayData.item("schema", type)
.withLabel("Record Schema"))
.addIfNotNull(DisplayData.item("filePrefix", filenamePrefix)
.withLabel("Output File Prefix"))
.addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate)
.withLabel("Output Shard Name Template"),
DEFAULT_SHARD_TEMPLATE)
.addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
.withLabel("Output File Suffix"),
"")
.addIfNotDefault(DisplayData.item("numShards", numShards)
.withLabel("Maximum Output Shards"),
0)
.addIfNotDefault(DisplayData.item("validation", validate)
.withLabel("Validation Enabled"),
true);
}

/**
Expand Down
72 changes: 62 additions & 10 deletions sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -629,13 +629,18 @@ public void populateDisplayData(DisplayData.Builder builder) {
TableReference table = getTable();

if (table != null) {
builder.add("table", toTableSpec(table));
builder.add(DisplayData.item("table", toTableSpec(table))
.withLabel("Table"));
}

builder
.addIfNotNull("query", query)
.addIfNotNull("flattenResults", flattenResults)
.addIfNotDefault("validation", validate, true);
.addIfNotNull(DisplayData.item("query", query)
.withLabel("Query"))
.addIfNotNull(DisplayData.item("flattenResults", flattenResults)
.withLabel("Flatten Query Results"))
.addIfNotDefault(DisplayData.item("validation", validate)
.withLabel("Validation Enabled"),
true);
}

/**
Expand Down Expand Up @@ -800,6 +805,12 @@ public synchronized long getEstimatedSizeBytes(PipelineOptions options) throws E
protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception {
// Do nothing.
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.add(DisplayData.item("table", jsonTable));
}
}

/**
Expand Down Expand Up @@ -891,6 +902,11 @@ protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception {
tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId());
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.add(DisplayData.item("query", query));
}
private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions)
throws InterruptedException, IOException {
if (dryRunJobStats.get() == null) {
Expand Down Expand Up @@ -1740,17 +1756,23 @@ public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

builder
.addIfNotNull("table", jsonTableRef)
.addIfNotNull("schema", jsonSchema);
.addIfNotNull(DisplayData.item("table", jsonTableRef)
.withLabel("Table Reference"))
.addIfNotNull(DisplayData.item("schema", jsonSchema)
.withLabel("Table Schema"));

if (tableRefFunction != null) {
builder.add("tableFn", tableRefFunction.getClass());
builder.add(DisplayData.item("tableFn", tableRefFunction.getClass())
.withLabel("Table Reference Function"));
}

builder
.add("createDisposition", createDisposition.toString())
.add("writeDisposition", writeDisposition.toString())
.addIfNotDefault("validation", validate, true);
.add(DisplayData.item("createDisposition", createDisposition.toString())
.withLabel("Table CreateDisposition"))
.add(DisplayData.item("writeDisposition", writeDisposition.toString())
.withLabel("Table WriteDisposition"))
.addIfNotDefault(DisplayData.item("validation", validate)
.withLabel("Validation Enabled"), true);
}

/** Returns the create disposition. */
Expand Down Expand Up @@ -1837,6 +1859,17 @@ public FileBasedSink.FileBasedWriteOperation<TableRow> createWriteOperation(
return new BigQueryWriteOperation(this);
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

builder
.addIfNotNull(DisplayData.item("schema", jsonSchema)
.withLabel("Table Schema"))
.addIfNotNull(DisplayData.item("tableSpec", jsonTable)
.withLabel("Table Specification"));
}

private static class BigQueryWriteOperation extends FileBasedWriteOperation<TableRow> {
// The maximum number of retry load jobs.
private static final int MAX_RETRY_LOAD_JOBS = 3;
Expand Down Expand Up @@ -2070,6 +2103,14 @@ public void finishBundle(Context context) throws Exception {
uniqueIdsForTableRows.clear();
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

builder.addIfNotNull(DisplayData.item("schema", jsonTableSchema)
.withLabel("Table Schema"));
}

public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec)
throws IOException {
TableReference tableReference = parseTableSpec(tableSpec);
Expand Down Expand Up @@ -2277,6 +2318,17 @@ public void processElement(ProcessContext context) throws IOException {
new TableRowInfo(context.element(), uniqueId)));
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

builder.addIfNotNull(DisplayData.item("table", tableSpec));
if (tableRefFunction != null) {
builder.add(DisplayData.item("tableFn", tableRefFunction.getClass())
.withLabel("Table Reference Function"));
}
}

private String tableSpecFromWindow(BigQueryOptions options, BoundedWindow window) {
if (tableSpec != null) {
return tableSpec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,12 @@ public String getKindString() {
public void populateDisplayData(DisplayData.Builder builder) {
// We explicitly do not register base-class data, instead we use the delegate inner source.
builder
.add("source", source.getClass())
.addIfNotDefault("maxRecords", maxNumRecords, Long.MAX_VALUE)
.addIfNotNull("maxReadTime", maxReadTime)
.add(DisplayData.item("source", source.getClass())
.withLabel("Read Source"))
.addIfNotDefault(DisplayData.item("maxRecords", maxNumRecords)
.withLabel("Maximum Read Records"), Long.MAX_VALUE)
.addIfNotNull(DisplayData.item("maxReadTime", maxReadTime)
.withLabel("Maximum Read Time"))
.include(source);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,14 +324,17 @@ public void populateDisplayData(DisplayData.Builder builder) {
// We explicitly do not register base-class data, instead we use the delegate inner source.
builder
.include(sourceDelegate)
.add("source", sourceDelegate.getClass());
.add(DisplayData.item("source", sourceDelegate.getClass())
.withLabel("Read Source"));

if (channelFactory instanceof Enum) {
// GZIP and BZIP are implemented as enums; Enum classes are anonymous, so use the .name()
// value instead
builder.add("compressionMode", ((Enum) channelFactory).name());
builder.add(DisplayData.item("compressionMode", ((Enum) channelFactory).name())
.withLabel("Compression Mode"));
} else {
builder.add("compressionMode", channelFactory.getClass());
builder.add(DisplayData.item("compressionMode", channelFactory.getClass())
.withLabel("Compression Mode"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ public PCollection<Long> apply(PBegin begin) {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.add("upTo", numElements);
builder.add(DisplayData.item("upTo", numElements)
.withLabel("Count Up To"));
}
}

Expand Down Expand Up @@ -199,14 +200,17 @@ public PCollection<Long> apply(PBegin begin) {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

builder.add("timestampFn", timestampFn.getClass());
builder.add(DisplayData.item("timestampFn", timestampFn.getClass())
.withLabel("Timestamp Function"));

if (maxReadTime.isPresent()) {
builder.add("maxReadTime", maxReadTime.get());
builder.add(DisplayData.item("maxReadTime", maxReadTime.get())
.withLabel("Maximum Read Time"));
}

if (maxNumRecords.isPresent()) {
builder.add("maxRecords", maxNumRecords.get());
builder.add(DisplayData.item("maxRecords", maxNumRecords.get())
.withLabel("Maximum Read Records"));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,12 +401,16 @@ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.addIfNotDefault("host", host, DEFAULT_HOST)
.addIfNotNull("dataset", datasetId)
.addIfNotNull("namespace", namespace);
.addIfNotDefault(DisplayData.item("host", host)
.withLabel("Datastore Service"), DEFAULT_HOST)
.addIfNotNull(DisplayData.item("dataset", datasetId)
.withLabel("Input Dataset"))
.addIfNotNull(DisplayData.item("namespace", namespace)
.withLabel("App Engine Namespace"));

if (query != null) {
builder.add("query", query.toString());
builder.add(DisplayData.item("query", query.toString())
.withLabel("Query"));
}
}

Expand Down Expand Up @@ -614,9 +618,12 @@ public DatastoreWriteOperation createWriteOperation(PipelineOptions options) {

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.addIfNotDefault("host", host, DEFAULT_HOST)
.addIfNotNull("dataset", datasetId);
.addIfNotDefault(DisplayData.item("host", host)
.withLabel("Datastore Service"), DEFAULT_HOST)
.addIfNotNull(DisplayData.item("dataset", datasetId)
.withLabel("Output Dataset"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public void populateDisplayData(DisplayData.Builder builder) {

String fileNamePattern = String.format("%s%s%s",
baseOutputFilename, fileNamingTemplate, getFileExtension(extension));
builder.add("fileNamePattern", fileNamePattern);
builder.add(DisplayData.item("fileNamePattern", fileNamePattern)
.withLabel("File Name Pattern"));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ private static long getEstimatedSizeOfFilesBySampling(
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.add("filePattern", getFileOrPatternSpec());
builder.add(DisplayData.item("filePattern", getFileOrPatternSpec())
.withLabel("File Pattern"));
}

private ListenableFuture<List<? extends FileBasedSource<T>>> createFutureForFileSplit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,12 @@ public boolean allowsDynamicSplitting() {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.add("minBundleSize", minBundleSize)
.addIfNotDefault("startOffset", startOffset, 0)
.addIfNotDefault("endOffset", endOffset, Long.MAX_VALUE);
.addIfNotDefault(DisplayData.item("minBundleSize", minBundleSize)
.withLabel("Minimum Bundle Size"), 1L)
.addIfNotDefault(DisplayData.item("startOffset", startOffset)
.withLabel("Start Read Offset"), 0L)
.addIfNotDefault(DisplayData.item("endOffset", endOffset)
.withLabel("End Read Offset"), Long.MAX_VALUE);
}

/**
Expand Down
47 changes: 30 additions & 17 deletions sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,23 @@ protected static Instant assignMessageTimestamp(
return new Instant(millisSinceEpoch);
}

/**
* Populate common {@link DisplayData} between Pubsub source and sink.
*/
private static void populateCommonDisplayData(DisplayData.Builder builder,
String timestampLabel, String idLabel, PubsubTopic topic) {
builder
.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel)
.withLabel("Timestamp Label Attribute"))
.addIfNotNull(DisplayData.item("idLabel", idLabel)
.withLabel("ID Label Attribute"));

if (topic != null) {
builder.add(DisplayData.item("topic", topic.asPath())
.withLabel("Pubsub Topic"));
}
}

/**
* Class representing a Cloud Pub/Sub Subscription.
*/
Expand Down Expand Up @@ -691,19 +708,17 @@ public PCollection<T> apply(PInput input) {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
populateCommonDisplayData(builder, timestampLabel, idLabel, topic);

builder
.addIfNotNull("timestampLabel", timestampLabel)
.addIfNotNull("idLabel", idLabel)
.addIfNotNull("maxReadTime", maxReadTime)
.addIfNotDefault("maxNumRecords", maxNumRecords, 0);

if (topic != null) {
builder.add("topic", topic.asPath());
}
.addIfNotNull(DisplayData.item("maxReadTime", maxReadTime)
.withLabel("Maximum Read Time"))
.addIfNotDefault(DisplayData.item("maxNumRecords", maxNumRecords)
.withLabel("Maximum Read Records"), 0);

if (subscription != null) {
builder.add("subscription", subscription.asPath());
builder.add(DisplayData.item("subscription", subscription.asPath())
.withLabel("Pubsub Subscription"));
}
}

Expand Down Expand Up @@ -995,14 +1010,7 @@ public PDone apply(PCollection<T> input) {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);

builder
.addIfNotNull("timestampLabel", timestampLabel)
.addIfNotNull("idLabel", idLabel);

if (topic != null) {
builder.add("topic", topic.asPath());
}
populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
}

@Override
Expand Down Expand Up @@ -1072,6 +1080,11 @@ private void publish() throws IOException {
.execute();
output.clear();
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
Bound.this.populateDisplayData(builder);
}
}
}

Expand Down
Loading