Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
72ec9b8
Make Regex Transform
eljefe6a May 2, 2016
e6f8c95
Merge remote-tracking branch 'upstream/master'
eljefe6a May 2, 2016
587eaae
Fixing checkstyle issues. Added missing Apache license.
eljefe6a May 3, 2016
df3045f
Added distributed replacement functions. Add replaceAll and replaceFi…
eljefe6a May 5, 2016
793d226
Whitespace fixes for check style.
eljefe6a May 5, 2016
9e5a997
Merge remote-tracking branch 'upstream/master'
eljefe6a May 9, 2016
425a4d8
Merge remote-tracking branch 'upstream/master'
eljefe6a May 11, 2016
225b2d0
Merge remote-tracking branch 'upstream/master'
eljefe6a May 14, 2016
c1a3bc5
Changed Word Counts to use TypeDescriptors.
eljefe6a May 16, 2016
adfeb01
Updated complete examples to use TypeDescriptors.
eljefe6a May 16, 2016
2a455f3
Removing Regex transforms from this branch.
eljefe6a May 16, 2016
eac1050
Merge remote-tracking branch 'upstream/master' into TypeDescriptorsEx…
eljefe6a May 17, 2016
0d5d9b6
Merge commit 'bb086b8d367e2c360d965659b094035cf01d9959' into TypeDesc…
eljefe6a May 23, 2016
8ed14ca
Trivial change to kick off another build.
eljefe6a May 23, 2016
f9cd719
Trivial change to kick off another build.
eljefe6a May 23, 2016
8a67ca0
KafkaIO: pin to current working version
dhalperi May 23, 2016
c913900
Closes #374
dhalperi May 23, 2016
6a74143
PubsubIO: make translation to Dataflow service compatible
May 21, 2016
9934a43
Closes #371
dhalperi May 23, 2016
269af8d
Update Flink runner's pom.xml
davorbonaci May 23, 2016
6e97b11
Update Flink runner's pom.xml
davorbonaci May 23, 2016
264ff74
Update Java 8 examples' pom.xml file
davorbonaci May 24, 2016
2376aea
This closes #377
davorbonaci May 24, 2016
ed8b1d5
TypeDescriptor: remove bogus import
dhalperi May 24, 2016
27abf44
Closes #381
dhalperi May 24, 2016
fae3af0
Make Regex Transform
eljefe6a May 2, 2016
f098c5e
Fixing checkstyle issues. Added missing Apache license.
eljefe6a May 3, 2016
78f4f53
Added distributed replacement functions. Add replaceAll and replaceFi…
eljefe6a May 5, 2016
d812294
Whitespace fixes for check style.
eljefe6a May 5, 2016
8c43cb2
Changed Word Counts to use TypeDescriptors.
eljefe6a May 16, 2016
6834cbe
Updated complete examples to use TypeDescriptors.
eljefe6a May 16, 2016
b5ad893
Removing Regex transforms from this branch.
eljefe6a May 16, 2016
3c94292
Trivial change to kick off another build.
eljefe6a May 23, 2016
d7b86d8
Trivial change to kick off another build.
eljefe6a May 23, 2016
089fba1
Merge branch 'TypeDescriptorsExamples' of https://github.com/eljefe6a…
eljefe6a May 24, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions examples/java8/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@

<build>
<plugins>
<!-- Disable javadoc for now.
TODO: this section should be removed as soon as possible. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<executions>
<execution>
<id>attach-javadocs</id>
<phase/>
</execution>
</executions>
</plugin>

<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;

import java.util.Arrays;

Expand All @@ -54,12 +54,12 @@ public static void main(String[] args) {

p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
.apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
.withOutputType(new TypeDescriptor<String>() {}))
.withOutputType(TypeDescriptors.strings()))
.apply(Filter.byPredicate((String word) -> !word.isEmpty()))
.apply(Count.<String>perElement())
.apply(MapElements
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
.withOutputType(new TypeDescriptor<String>() {}))
.withOutputType(TypeDescriptors.strings()))

// CHANGE 3 of 3: The Google Cloud Storage path is required for outputting the results to.
.apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;

import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
Expand Down Expand Up @@ -255,7 +255,8 @@ public static void main(String[] args) throws Exception {
PCollection<KV<String, Integer>> userEvents =
rawEvents.apply("ExtractUserScore",
MapElements.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
.withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));
.withOutputType(
TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())));

// Calculate the total score per user over fixed windows, and
// cumulative updates for late data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;

import org.apache.avro.reflect.Nullable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -168,7 +168,8 @@ public PCollection<KV<String, Integer>> apply(
return gameInfo
.apply(MapElements
.via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore()))
.withOutputType(new TypeDescriptor<KV<String, Integer>>() {}))
.withOutputType(
TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())))
.apply(Sum.<String>integersPerKey());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;

import com.google.common.collect.ImmutableList;

Expand Down Expand Up @@ -65,12 +65,12 @@ public void testMinimalWordCountJava8() throws Exception {

p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
.apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
.withOutputType(new TypeDescriptor<String>() {}))
.withOutputType(TypeDescriptors.strings()))
.apply(Filter.byPredicate((String word) -> !word.isEmpty()))
.apply(Count.<String>perElement())
.apply(MapElements
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
.withOutputType(new TypeDescriptor<String>() {}))
.withOutputType(TypeDescriptors.strings()))
.apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;

import org.joda.time.Instant;
import org.junit.Test;
Expand Down Expand Up @@ -102,7 +102,8 @@ public void testUserScoresFilter() throws Exception {
// run a map to access the fields in the result.
.apply(MapElements
.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
.withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));
.withOutputType(
TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())));

PAssert.that(output).containsInAnyOrder(FILTERED_EVENTS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;

import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -146,7 +146,8 @@ public void testUserScoresBadInput() throws Exception {
.apply(ParDo.of(new ParseEventFn()))
.apply(
MapElements.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
.withOutputType(new TypeDescriptor<KV<String, Integer>>() {}));
.withOutputType(
TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())));

PAssert.that(extract).empty();

Expand Down
13 changes: 13 additions & 0 deletions runners/flink/examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,19 @@
</plugin>
-->

<!-- Disable javadoc for now.
TODO: this section should be removed as soon as possible. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<executions>
<execution>
<id>attach-javadocs</id>
<phase/>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
Expand Down
15 changes: 14 additions & 1 deletion runners/flink/runner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
<version>${flink.version}</version>
</dependency>

<!--- Beam -->
<!-- Beam -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>java-sdk-all</artifactId>
Expand Down Expand Up @@ -159,6 +159,19 @@
</plugin>
-->

<!-- Disable javadoc for now.
TODO: this section should be removed as soon as possible. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<executions>
<execution>
<id>attach-javadocs</id>
<phase/>
</execution>
</executions>
</plugin>

<!-- Integration Tests -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.PubsubApiaryClient;
import org.apache.beam.sdk.util.PubsubClient;
import org.apache.beam.sdk.util.PubsubClient.IncomingMessage;
import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
import org.apache.beam.sdk.util.PubsubClient.ProjectPath;
import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.util.PubsubClient.TopicPath;
import org.apache.beam.sdk.util.PubsubJsonClient;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
Expand Down Expand Up @@ -71,7 +71,7 @@ public class PubsubIO {
private static final Logger LOG = LoggerFactory.getLogger(PubsubIO.class);

/** Factory for creating pubsub client to manage transport. */
private static final PubsubClient.PubsubClientFactory FACTORY = PubsubApiaryClient.FACTORY;
private static final PubsubClient.PubsubClientFactory FACTORY = PubsubJsonClient.FACTORY;

/** The default {@link Coder} used to translate to/from Cloud Pub/Sub messages. */
public static final Coder<String> DEFAULT_PUBSUB_CODER = StringUtf8Coder.of();
Expand Down Expand Up @@ -646,7 +646,8 @@ public PCollection<T> apply(PInput input) {
if (boundedOutput) {
return input.getPipeline().begin()
.apply(Create.of((Void) null)).setCoder(VoidCoder.of())
.apply(ParDo.of(new PubsubBoundedReader())).setCoder(coder);
.apply(ParDo.of(new PubsubBoundedReader()))
.setCoder(coder);
} else {
@Nullable ProjectPath projectPath =
topic == null ? null : PubsubClient.projectPathFromId(topic.project);
Expand All @@ -655,8 +656,8 @@ public PCollection<T> apply(PInput input) {
@Nullable SubscriptionPath subscriptionPath =
subscription == null
? null
: PubsubClient
.subscriptionPathFromName(subscription.project, subscription.subscription);
: PubsubClient.subscriptionPathFromName(
subscription.project, subscription.subscription);
return input.getPipeline().begin()
.apply(new PubsubUnboundedSource<T>(
FACTORY, projectPath, topicPath, subscriptionPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@
import com.google.common.hash.Hashing;

import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -89,8 +87,6 @@
* <p>NOTE: This is not the implementation used when running on the Google Cloud Dataflow service.
*/
public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSink.class);

/**
* Default maximum number of messages per publish.
*/
Expand Down Expand Up @@ -249,9 +245,8 @@ private static class WriterFn
*/
private void publishBatch(List<OutgoingMessage> messages, int bytes)
throws IOException {
long nowMsSinceEpoch = System.currentTimeMillis();
int n = pubsubClient.publish(topic, messages);
checkState(n == messages.size(), "Attempted to publish %d messages but %d were successful",
checkState(n == messages.size(), "Attempted to publish %s messages but %s were successful",
messages.size(), n);
batchCounter.addValue(1L);
elementCounter.addValue((long) messages.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,6 @@
* {@link #sampleUpdateMs}.
*/
public class MovingFunction {
/**
* How far back to retain samples, in ms.
*/
private final long samplePeriodMs;

/**
* How frequently to update the moving function, in ms.
*/
Expand Down Expand Up @@ -77,7 +72,6 @@ public class MovingFunction {
public MovingFunction(long samplePeriodMs, long sampleUpdateMs,
int numSignificantBuckets, int numSignificantSamples,
Combine.BinaryCombineLongFn function) {
this.samplePeriodMs = samplePeriodMs;
this.sampleUpdateMs = sampleUpdateMs;
this.numSignificantBuckets = numSignificantBuckets;
this.numSignificantSamples = numSignificantSamples;
Expand Down Expand Up @@ -123,7 +117,7 @@ public void add(long nowMsSinceEpoch, long value) {
}

/**
* Return the minimum/maximum/sum of all retained values within {@link #samplePeriodMs}
* Return the minimum/maximum/sum of all retained values within samplePeriodMs
* of {@code nowMsSinceEpoch}.
*/
public long get(long nowMsSinceEpoch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ PubsubClient newClient(
PubsubOptions options) throws IOException;

/**
* Return the display name for this factory. Eg "Apiary", "gRPC".
* Return the display name for this factory. Eg "Json", "gRPC".
*/
String getKind();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import org.apache.beam.sdk.options.PubsubOptions;

import com.google.api.client.auth.oauth2.Credential;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.Pubsub.Builder;
import com.google.api.services.pubsub.model.AcknowledgeRequest;
Expand Down Expand Up @@ -50,39 +52,48 @@
import javax.annotation.Nullable;

/**
* A Pubsub client using Apiary.
* A Pubsub client using JSON transport.
*/
public class PubsubApiaryClient extends PubsubClient {
public class PubsubJsonClient extends PubsubClient {

private static class PubsubJsonClientFactory implements PubsubClientFactory {
private static HttpRequestInitializer chainHttpRequestInitializer(
Credential credential, HttpRequestInitializer httpRequestInitializer) {
if (credential == null) {
return httpRequestInitializer;
} else {
return new ChainingHttpRequestInitializer(credential, httpRequestInitializer);
}
}

private static class PubsubApiaryClientFactory implements PubsubClientFactory {
@Override
public PubsubClient newClient(
@Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options)
throws IOException {
Pubsub pubsub = new Builder(
Transport.getTransport(),
Transport.getJsonFactory(),
new ChainingHttpRequestInitializer(
chainHttpRequestInitializer(
options.getGcpCredential(),
// Do not log 404. It clutters the output and is possibly even required by the caller.
new RetryHttpRequestInitializer(ImmutableList.of(404))))
.setRootUrl(options.getPubsubRootUrl())
.setApplicationName(options.getAppName())
.setGoogleClientRequestInitializer(options.getGoogleApiTrace())
.build();
return new PubsubApiaryClient(timestampLabel, idLabel, pubsub);
return new PubsubJsonClient(timestampLabel, idLabel, pubsub);
}

@Override
public String getKind() {
return "Apiary";
return "Json";
}
}

/**
* Factory for creating Pubsub clients using Apiary transport.
* Factory for creating Pubsub clients using Json transport.
*/
public static final PubsubClientFactory FACTORY = new PubsubApiaryClientFactory();
public static final PubsubClientFactory FACTORY = new PubsubJsonClientFactory();

/**
* Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time
Expand All @@ -98,12 +109,12 @@ public String getKind() {
private final String idLabel;

/**
* Underlying Apiary client.
* Underlying JSON transport.
*/
private Pubsub pubsub;

@VisibleForTesting
PubsubApiaryClient(
PubsubJsonClient(
@Nullable String timestampLabel,
@Nullable String idLabel,
Pubsub pubsub) {
Expand Down
Loading