From 72ec9b82fb21ee9040b1bee6615fecfb9916e470 Mon Sep 17 00:00:00 2001 From: Jesse Anderson Date: Mon, 2 May 2016 11:39:26 -0700 Subject: [PATCH 01/24] Make Regex Transform --- .../beam/sdk/transforms/RegexTransform.java | 316 ++++++++++++++++++ .../sdk/transforms/RegexTransformTest.java | 155 +++++++++ 2 files changed, 471 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java new file mode 100644 index 000000000000..027967c4e4ae --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java @@ -0,0 +1,316 @@ +package org.apache.beam.sdk.transforms; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +/** + * {@code PTransorm}s to use Regular Expressions to process elements in a + * {@link PCollection}. + * + *

+ * {@link RegexTransform#matches()} can be used to see if an entire line matches + * a Regex. {@link RegexTransform#matchesKV()} can be used to see if an entire + * line matches a Regex and output certain groups as a {@link KV}. + *

+ * + *

+ * {@link RegexTransform#find()} can be used to see if a portion of a line + * matches a Regex. {@link RegexTransform#matchesKV()} can be used to see if a + * portion of a line matches a Regex and output certain groups as a {@link KV}. + *

+ * + *

+ * Lines that do not match the Regex will not be output. + *

+ */ +public class RegexTransform { + private RegexTransform() { + // do not instantiate + } + + /** + * Returns a {@link RegexTransform.Matches} {@link PTransform} that checks if + * the entire line matches the Regex. Returns the entire line (group 0) as a + * {@link PCollection}. + * + * @param regex + * The regular expression to run + */ + public static Matches matches(String regex) { + return matches(regex, 0); + } + + /** + * Returns a {@link RegexTransform.Matches} {@link PTransform} that checks if + * the entire line matches the Regex. Returns the group as a + * {@link PCollection}. + * + * @param regex + * The regular expression to run + * @param group + * The Regex group to return as a PCollection + */ + public static Matches matches(String regex, int group) { + return new Matches(regex, group); + } + + /** + * Returns a {@link RegexTransform.MatchesKV} {@link PTransform} that checks + * if the entire line matches the Regex. Returns the specified groups as the + * key and value as a {@link PCollection}. + * + * @param regex + * The regular expression to run + * @param keyGroup + * The Regex group to use as the key + * @param valueGroup + * The Regex group to use the value + */ + public static MatchesKV matchesKV(String regex, int keyGroup, + int valueGroup) { + return new MatchesKV(regex, keyGroup, valueGroup); + } + + /** + * Returns a {@link RegexTransform.Find} {@link PTransform} that checks if a + * portion of the line matches the Regex. Returns the entire line (group 0) as + * a {@link PCollection}. + * + * @param regex + * The regular expression to run + */ + public static Find find(String regex) { + return find(regex, 0); + } + + /** + * Returns a {@link RegexTransform.Find} {@link PTransform} that checks if a + * portion of the line matches the Regex. Returns the group as a + * {@link PCollection}. + * + * @param regex + * The regular expression to run + * @param group + * The Regex group to return as a PCollection + */ + public static Find find(String regex, int group) { + return new Find(regex, group); + } + + /** + * Returns a {@link RegexTransform.FindKV} {@link PTransform} that checks if a + * portion of the line matches the Regex. Returns the specified groups as the + * key and value as a {@link PCollection}. + * + * @param regex + * The regular expression to run + * @param keyGroup + * The Regex group to use as the key + * @param valueGroup + * The Regex group to use the value + */ + public static FindKV findKV(String regex, int keyGroup, int valueGroup) { + return new FindKV(regex, keyGroup, valueGroup); + } + + /** + * {@code RegexTransform.Matches} takes a {@code PCollection} + * and returns a {@code PCollection>} representing the key + * and value extracted from the Regex groups of the input {@code PCollection} + * to the number of times that element occurs in the input. + * + *

+ * This transform runs a Regex on the entire input line. If the entire line + * does not match the Regex, the line will not be output. If it does match the + * entire line, the group in the Regex will be used. The output will be the + * Regex group. + * + *

+ * Example of use: + * + *

+   *  {@code
+   * PCollection words = ...;
+   * PCollection values =
+   *     words.apply(RegexTransform.matches("myregex (mygroup)", 1));
+   * }
+   * 
+ */ + public static class Matches + extends PTransform, PCollection> { + Pattern pattern; + int group; + + public Matches(String regex, int group) { + this.pattern = Pattern.compile(regex); + this.group = group; + } + + public PCollection apply(PCollection in) { + return in + .apply(ParDo.named("MatchesRegex").of(new DoFn() { + @Override + public void processElement(ProcessContext c) throws Exception { + Matcher m = pattern.matcher((String) c.element()); + + if (m.matches()) { + c.output(m.group(group)); + } + } + })); + } + } + + /** + * {@code RegexTransform.MatchesKV>} takes a + * {@code PCollection} and returns a + * {@code PCollection>} representing the key and value + * extracted from the Regex groups of the input {@code PCollection} to the + * number of times that element occurs in the input. + * + *

+ * This transform runs a Regex on the entire input line. If the entire line + * does not match the Regex, the line will not be output. If it does match the + * entire line, the groups in the Regex will be used. The key will be the + * key's group and the value will be the value's group. + * + *

+ * Example of use: + * + *

+   *  {@code
+   * PCollection words = ...;
+   * PCollection> keysAndValues =
+   *     words.apply(RegexTransform.matchesKV("myregex (mykeygroup) (myvaluegroup)", 1, 2));
+   * }
+   * 
+ */ + public static class MatchesKV + extends PTransform, PCollection>> { + Pattern pattern; + int keyGroup, valueGroup; + + public MatchesKV(String regex, int keyGroup, int valueGroup) { + this.pattern = Pattern.compile(regex); + this.keyGroup = keyGroup; + this.valueGroup = valueGroup; + } + + public PCollection> apply(PCollection in) { + return in.apply(ParDo.named("MatchesKVRegex") + .of(new DoFn>() { + @Override + public void processElement(ProcessContext c) throws Exception { + Matcher m = pattern.matcher((String) c.element()); + + if (m.find()) { + c.output(KV.of(m.group(keyGroup), m.group(valueGroup))); + } + } + })); + } + } + + /** + * {@code RegexTransform.Find} takes a {@code PCollection} and + * returns a {@code PCollection>} representing the key and + * value extracted from the Regex groups of the input {@code PCollection} to + * the number of times that element occurs in the input. + * + *

+ * This transform runs a Regex on the entire input line. If a portion of the + * line does not match the Regex, the line will not be output. If it does + * match a portion of the line, the group in the Regex will be used. The + * output will be the Regex group. + * + *

+ * Example of use: + * + *

+   *  {@code
+   * PCollection words = ...;
+   * PCollection values =
+   *     words.apply(RegexTransform.find("myregex (mygroup)", 1));
+   * }
+   * 
+ */ + public static class Find + extends PTransform, PCollection> { + Pattern pattern; + int group; + + public Find(String regex, int group) { + this.pattern = Pattern.compile(regex); + this.group = group; + } + + public PCollection apply(PCollection in) { + return in.apply(ParDo.named("FindRegex").of(new DoFn() { + @Override + public void processElement(ProcessContext c) throws Exception { + Matcher m = pattern.matcher((String) c.element()); + + if (m.find()) { + c.output(m.group(group)); + } + } + })); + } + } + + /** + * {@code RegexTransform.MatchesKV>} takes a + * {@code PCollection} and returns a + * {@code PCollection>} representing the key and value + * extracted from the Regex groups of the input {@code PCollection} to the + * number of times that element occurs in the input. + * + *

+ * This transform runs a Regex on the entire input line. If a portion of the + * line does not match the Regex, the line will not be output. If it does + * match a portion of the line, the groups in the Regex will be used. The key + * will be the key's group and the value will be the value's group. + * + *

+ * Example of use: + * + *

+   *  {@code
+   * PCollection words = ...;
+   * PCollection> keysAndValues =
+   *     words.apply(RegexTransform.findKV("myregex (mykeygroup) (myvaluegroup)", 1, 2));
+   * }
+   * 
+ */ + public static class FindKV + extends PTransform, PCollection>> { + Pattern pattern; + int keyGroup, valueGroup; + + public FindKV(String regex, int keyGroup, int valueGroup) { + this.pattern = Pattern.compile(regex); + this.keyGroup = keyGroup; + this.valueGroup = valueGroup; + } + + public PCollection> apply(PCollection in) { + return in.apply( + ParDo.named("FindKVRegex").of(new DoFn>() { + @Override + public void processElement(ProcessContext c) throws Exception { + Matcher m = pattern.matcher((String) c.element()); + + if (m.find()) { + c.output(KV.of(m.group(keyGroup), m.group(valueGroup))); + } + } + })); + } + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java new file mode 100644 index 000000000000..a64626a10e75 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.io.Serializable; + +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link RegexTransform}. + */ +@RunWith(JUnit4.class) +public class RegexTransformTest implements Serializable { + @Test + public void testFind() { + TestPipeline p = TestPipeline.create(); + + PCollection output = p + .apply(Create.of("aj", "xj", "yj", "zj")) + .apply(RegexTransform.find("[xyz]")); + + PAssert.that(output).containsInAnyOrder("x", "y", "z"); + p.run(); + } + + @Test + public void testFindGroup() { + TestPipeline p = TestPipeline.create(); + + PCollection output = p + .apply(Create.of("aj", "xj", "yj", "zj")) + .apply(RegexTransform.find("([xyz])", 1)); + + PAssert.that(output).containsInAnyOrder("x", "y", "z"); + p.run(); + } + + @Test + public void testFindNone() { + TestPipeline p = TestPipeline.create(); + + PCollection output = p + .apply(Create.of("a", "b", "c", "d")) + .apply(RegexTransform.find("[xyz]")); + + PAssert.that(output).empty(); + p.run(); + } + + @Test + public void testKVFind() { + TestPipeline p = TestPipeline.create(); + + PCollection> output = p + .apply(Create.of("a b c")) + .apply(RegexTransform.findKV("a (b) (c)", 1, 2)); + + PAssert.that(output).containsInAnyOrder(KV.of("b", "c")); + p.run(); + } + + @Test + public void testKVFindNone() { + TestPipeline p = TestPipeline.create(); + + PCollection> output = p + .apply(Create.of("x y z")) + .apply(RegexTransform.findKV("a (b) (c)", 1, 2)); + + PAssert.that(output).empty(); + p.run(); + } + + @Test + public void testMatches() { + TestPipeline p = TestPipeline.create(); + + PCollection output = p + .apply(Create.of("a", "x", "y", "z")) + .apply(RegexTransform.matches("[xyz]")); + + PAssert.that(output).containsInAnyOrder("x", "y", "z"); + p.run(); + } + + @Test + public void testMatchesNone() { + TestPipeline p = TestPipeline.create(); + + PCollection output = p + .apply(Create.of("a", "b", "c", "d")) + .apply(RegexTransform.matches("[xyz]")); + + PAssert.that(output).empty(); + p.run(); + } + + @Test + public void testMatchesGroup() { + TestPipeline p = TestPipeline.create(); + + PCollection output = p + .apply(Create.of("a", "x xxx", "x yyy", "x zzz")) + .apply(RegexTransform.matches("x ([xyz]*)", 1)); + + PAssert.that(output).containsInAnyOrder("xxx", "yyy", "zzz"); + p.run(); + } + + @Test + public void testKVMatches() { + TestPipeline p = TestPipeline.create(); + + PCollection> output = p + .apply(Create.of("a b c")) + .apply(RegexTransform.matchesKV("a (b) (c)", 1, 2)); + + PAssert.that(output).containsInAnyOrder(KV.of("b", "c")); + p.run(); + } + + @Test + public void testKVMatchesNone() { + TestPipeline p = TestPipeline.create(); + + PCollection> output = p + .apply(Create.of("x y z")) + .apply(RegexTransform.matchesKV("a (b) (c)", 1, 2)); + + PAssert.that(output).empty(); + p.run(); + } +} From 587eaaec106829002df5df1b38753f811649aa51 Mon Sep 17 00:00:00 2001 From: Jesse Anderson Date: Mon, 2 May 2016 18:08:13 -0700 Subject: [PATCH 02/24] Fixing checkstyle issues. Added missing Apache license. --- .../beam/sdk/transforms/RegexTransform.java | 48 ++++++++++--------- .../sdk/transforms/RegexTransformTest.java | 23 +++++---- 2 files changed, 36 insertions(+), 35 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java index 027967c4e4ae..8421f6aec439 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java @@ -1,30 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.sdk.transforms; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + /** * {@code PTransorm}s to use Regular Expressions to process elements in a * {@link PCollection}. * *

- * {@link RegexTransform#matches()} can be used to see if an entire line matches - * a Regex. {@link RegexTransform#matchesKV()} can be used to see if an entire + * {@link RegexTransform#matches(String, int)} can be used to see if an entire line matches + * a Regex. {@link RegexTransform#matchesKV(String, int, int)} can be used to see if an entire * line matches a Regex and output certain groups as a {@link KV}. *

- * *

- * {@link RegexTransform#find()} can be used to see if a portion of a line - * matches a Regex. {@link RegexTransform#matchesKV()} can be used to see if a + * {@link RegexTransform#find(String, int)} can be used to see if a portion of a line + * matches a Regex. {@link RegexTransform#matchesKV(String, int, int)} can be used to see if a * portion of a line matches a Regex and output certain groups as a {@link KV}. *

- * *

* Lines that do not match the Regex will not be output. *

@@ -38,7 +50,6 @@ private RegexTransform() { * Returns a {@link RegexTransform.Matches} {@link PTransform} that checks if * the entire line matches the Regex. Returns the entire line (group 0) as a * {@link PCollection}. - * * @param regex * The regular expression to run */ @@ -50,7 +61,6 @@ public static Matches matches(String regex) { * Returns a {@link RegexTransform.Matches} {@link PTransform} that checks if * the entire line matches the Regex. Returns the group as a * {@link PCollection}. - * * @param regex * The regular expression to run * @param group @@ -64,7 +74,6 @@ public static Matches matches(String regex, int group) { * Returns a {@link RegexTransform.MatchesKV} {@link PTransform} that checks * if the entire line matches the Regex. Returns the specified groups as the * key and value as a {@link PCollection}. - * * @param regex * The regular expression to run * @param keyGroup @@ -81,19 +90,17 @@ public static MatchesKV matchesKV(String regex, int keyGroup, * Returns a {@link RegexTransform.Find} {@link PTransform} that checks if a * portion of the line matches the Regex. Returns the entire line (group 0) as * a {@link PCollection}. - * * @param regex * The regular expression to run */ public static Find find(String regex) { return find(regex, 0); } - + /** * Returns a {@link RegexTransform.Find} {@link PTransform} that checks if a * portion of the line matches the Regex. Returns the group as a * {@link PCollection}. - * * @param regex * The regular expression to run * @param group @@ -107,7 +114,6 @@ public static Find find(String regex, int group) { * Returns a {@link RegexTransform.FindKV} {@link PTransform} that checks if a * portion of the line matches the Regex. Returns the specified groups as the * key and value as a {@link PCollection}. - * * @param regex * The regular expression to run * @param keyGroup @@ -133,7 +139,6 @@ public static FindKV findKV(String regex, int keyGroup, int valueGroup) { * *

* Example of use: - * *

    *  {@code
    * PCollection words = ...;
@@ -182,7 +187,6 @@ public void processElement(ProcessContext c) throws Exception {
    *
    * 

* Example of use: - * *

    *  {@code
    * PCollection words = ...;
@@ -231,7 +235,6 @@ public void processElement(ProcessContext c) throws Exception {
    *
    * 

* Example of use: - * *

    *  {@code
    * PCollection words = ...;
@@ -279,7 +282,6 @@ public void processElement(ProcessContext c) throws Exception {
    *
    * 

* Example of use: - * *

    *  {@code
    * PCollection words = ...;
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java
index a64626a10e75..d916246ed38e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java
@@ -17,17 +17,17 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import java.io.Serializable;
-
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
+import java.io.Serializable;
+
 /**
  * Tests for {@link RegexTransform}.
  */
@@ -44,7 +44,7 @@ public void testFind() {
     PAssert.that(output).containsInAnyOrder("x", "y", "z");
     p.run();
   }
-  
+
   @Test
   public void testFindGroup() {
     TestPipeline p = TestPipeline.create();
@@ -64,11 +64,11 @@ public void testFindNone() {
     PCollection output = p
         .apply(Create.of("a", "b", "c", "d"))
         .apply(RegexTransform.find("[xyz]"));
-    
+
     PAssert.that(output).empty();
     p.run();
   }
-  
+
   @Test
   public void testKVFind() {
     TestPipeline p = TestPipeline.create();
@@ -88,11 +88,11 @@ public void testKVFindNone() {
     PCollection> output = p
         .apply(Create.of("x y z"))
         .apply(RegexTransform.findKV("a (b) (c)", 1, 2));
-    
+
     PAssert.that(output).empty();
     p.run();
   }
-  
+
   @Test
   public void testMatches() {
     TestPipeline p = TestPipeline.create();
@@ -112,11 +112,11 @@ public void testMatchesNone() {
     PCollection output = p
         .apply(Create.of("a", "b", "c", "d"))
         .apply(RegexTransform.matches("[xyz]"));
-    
+
     PAssert.that(output).empty();
     p.run();
   }
-  
+
   @Test
   public void testMatchesGroup() {
     TestPipeline p = TestPipeline.create();
@@ -128,7 +128,7 @@ public void testMatchesGroup() {
     PAssert.that(output).containsInAnyOrder("xxx", "yyy", "zzz");
     p.run();
   }
-  
+
   @Test
   public void testKVMatches() {
     TestPipeline p = TestPipeline.create();
@@ -148,7 +148,6 @@ public void testKVMatchesNone() {
     PCollection> output = p
         .apply(Create.of("x y z"))
         .apply(RegexTransform.matchesKV("a (b) (c)", 1, 2));
-    
     PAssert.that(output).empty();
     p.run();
   }

From df3045f62c939ef3a777ffbf658088f193144983 Mon Sep 17 00:00:00 2001
From: Jesse Anderson 
Date: Thu, 5 May 2016 08:46:14 -0700
Subject: [PATCH 03/24] Added distributed replacement functions. Add replaceAll
 and replaceFirst. Fixed some JavaDocs.

---
 .../beam/sdk/transforms/RegexTransform.java   | 118 +++++++++++++++++-
 .../sdk/transforms/RegexTransformTest.java    |  48 +++++++
 2 files changed, 162 insertions(+), 4 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java
index 8421f6aec439..9102bbabc380 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java
@@ -125,10 +125,36 @@ public static FindKV findKV(String regex, int keyGroup, int valueGroup) {
     return new FindKV(regex, keyGroup, valueGroup);
   }
 
+  /**
+   * Returns a {@link RegexTransform.ReplaceAll} {@link PTransform} that checks if a
+   * portion of the line matches the Regex and replaces all matches with the replacement
+   * String. Returns the group as a {@link PCollection}.
+   * @param regex
+   *          The regular expression to run
+   * @param replacement
+   *          The string to be substituted for each match
+   */
+  public static ReplaceAll replaceAll(String regex, String replacement) {
+    return new ReplaceAll(regex, replacement);
+  }
+
+  /**
+   * Returns a {@link RegexTransform.ReplaceAll} {@link PTransform} that checks if a
+   * portion of the line matches the Regex and replaces the first match with the replacement
+   * String. Returns the group as a {@link PCollection}.
+   * @param regex
+   *          The regular expression to run
+   * @param replacement
+   *          The string to be substituted for each match
+   */
+  public static ReplaceFirst replaceFirst(String regex, String replacement) {
+    return new ReplaceFirst(regex, replacement);
+  }
+
   /**
    * {@code RegexTransform.Matches} takes a {@code PCollection}
-   * and returns a {@code PCollection>} representing the key
-   * and value extracted from the Regex groups of the input {@code PCollection}
+   * and returns a {@code PCollection} representing the value
+   * extracted from the Regex groups of the input {@code PCollection}
    * to the number of times that element occurs in the input.
    *
    * 

@@ -223,8 +249,8 @@ public void processElement(ProcessContext c) throws Exception { /** * {@code RegexTransform.Find} takes a {@code PCollection} and - * returns a {@code PCollection>} representing the key and - * value extracted from the Regex groups of the input {@code PCollection} to + * returns a {@code PCollection} representing the value extracted + * from the Regex groups of the input {@code PCollection} to * the number of times that element occurs in the input. * *

@@ -315,4 +341,88 @@ public void processElement(ProcessContext c) throws Exception { })); } } + + /** + * {@code RegexTransform.ReplaceAll} takes a {@code PCollection} and + * returns a {@code PCollection} with all Strings that matched the + * Regex being replaced with the replacement string. + * + *

+ * This transform runs a Regex on the entire input line. If a portion of the + * line does not match the Regex, the line will be output without changes. If it does + * match a portion of the line, all portions matching the Regex will be replaced + * with the replacement String. + * + *

+ * Example of use: + *

+   *  {@code
+   * PCollection words = ...;
+   * PCollection values =
+   *     words.apply(RegexTransform.replaceAll("myregex", "myreplacement"));
+   * }
+   * 
+ */ + public static class ReplaceAll + extends PTransform, PCollection> { + Pattern pattern; + String replacement; + + public ReplaceAll(String regex, String replacement) { + this.pattern = Pattern.compile(regex); + this.replacement = replacement; + } + + public PCollection apply(PCollection in) { + return in.apply(ParDo.named("ReplaceAllRegex").of(new DoFn() { + @Override + public void processElement(ProcessContext c) throws Exception { + Matcher m = pattern.matcher((String) c.element()); + c.output(m.replaceAll(replacement)); + } + })); + } + } + + /** + * {@code RegexTransform.ReplaceFirst} takes a {@code PCollection} and + * returns a {@code PCollection} with the first Strings that matched the + * Regex being replaced with the replacement string. + * + *

+ * This transform runs a Regex on the entire input line. If a portion of the + * line does not match the Regex, the line will be output without changes. If it does + * match a portion of the line, the first portion matching the Regex will be replaced + * with the replacement String. + * + *

+ * Example of use: + *

+   *  {@code
+   * PCollection words = ...;
+   * PCollection values =
+   *     words.apply(RegexTransform.replaceFirst("myregex", "myreplacement"));
+   * }
+   * 
+ */ + public static class ReplaceFirst + extends PTransform, PCollection> { + Pattern pattern; + String replacement; + + public ReplaceFirst(String regex, String replacement) { + this.pattern = Pattern.compile(regex); + this.replacement = replacement; + } + + public PCollection apply(PCollection in) { + return in.apply(ParDo.named("ReplaceAllRegex").of(new DoFn() { + @Override + public void processElement(ProcessContext c) throws Exception { + Matcher m = pattern.matcher((String) c.element()); + c.output(m.replaceFirst(replacement)); + } + })); + } + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java index d916246ed38e..a9a5efbda268 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java @@ -151,4 +151,52 @@ public void testKVMatchesNone() { PAssert.that(output).empty(); p.run(); } + + @Test + public void testReplaceAll() { + TestPipeline p = TestPipeline.create(); + + PCollection output = p + .apply(Create.of("xj", "yj", "zj")) + .apply(RegexTransform.replaceAll("[xyz]", "new")); + + PAssert.that(output).containsInAnyOrder("newj", "newj", "newj"); + p.run(); + } + + @Test + public void testReplaceAllMixed() { + TestPipeline p = TestPipeline.create(); + + PCollection output = p + .apply(Create.of("abc", "xj", "yj", "zj", "def")) + .apply(RegexTransform.replaceAll("[xyz]", "new")); + + PAssert.that(output).containsInAnyOrder("abc", "newj", "newj", "newj", "def"); + p.run(); + } + + @Test + public void testReplaceFirst() { + TestPipeline p = TestPipeline.create(); + + PCollection output = p + .apply(Create.of("xjx", "yjy", "zjz")) + .apply(RegexTransform.replaceFirst("[xyz]", "new")); + + PAssert.that(output).containsInAnyOrder("newjx", "newjy", "newjz"); + p.run(); + } + + @Test + public void testReplaceFirstMixed() { + TestPipeline p = TestPipeline.create(); + + PCollection output = p + .apply(Create.of("abc", "xjx", "yjy", "zjz", "def")) + .apply(RegexTransform.replaceFirst("[xyz]", "new")); + + PAssert.that(output).containsInAnyOrder("abc", "newjx", "newjy", "newjz", "def"); + p.run(); + } } From 793d22667f485a5cdd49a7d36553c96e6898391c Mon Sep 17 00:00:00 2001 From: Jesse Anderson Date: Thu, 5 May 2016 08:55:58 -0700 Subject: [PATCH 04/24] Whitespace fixes for check style. --- .../apache/beam/sdk/transforms/RegexTransform.java | 12 ++++++------ .../beam/sdk/transforms/RegexTransformTest.java | 8 ++++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java index 9102bbabc380..99a2f708ec61 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java @@ -341,16 +341,16 @@ public void processElement(ProcessContext c) throws Exception { })); } } - + /** * {@code RegexTransform.ReplaceAll} takes a {@code PCollection} and - * returns a {@code PCollection} with all Strings that matched the + * returns a {@code PCollection} with all Strings that matched the * Regex being replaced with the replacement string. * *

* This transform runs a Regex on the entire input line. If a portion of the * line does not match the Regex, the line will be output without changes. If it does - * match a portion of the line, all portions matching the Regex will be replaced + * match a portion of the line, all portions matching the Regex will be replaced * with the replacement String. * *

@@ -383,16 +383,16 @@ public void processElement(ProcessContext c) throws Exception { })); } } - + /** * {@code RegexTransform.ReplaceFirst} takes a {@code PCollection} and - * returns a {@code PCollection} with the first Strings that matched the + * returns a {@code PCollection} with the first Strings that matched the * Regex being replaced with the replacement string. * *

* This transform runs a Regex on the entire input line. If a portion of the * line does not match the Regex, the line will be output without changes. If it does - * match a portion of the line, the first portion matching the Regex will be replaced + * match a portion of the line, the first portion matching the Regex will be replaced * with the replacement String. * *

diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java index a9a5efbda268..393a34da8c7d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java @@ -151,7 +151,7 @@ public void testKVMatchesNone() { PAssert.that(output).empty(); p.run(); } - + @Test public void testReplaceAll() { TestPipeline p = TestPipeline.create(); @@ -163,7 +163,7 @@ public void testReplaceAll() { PAssert.that(output).containsInAnyOrder("newj", "newj", "newj"); p.run(); } - + @Test public void testReplaceAllMixed() { TestPipeline p = TestPipeline.create(); @@ -175,7 +175,7 @@ public void testReplaceAllMixed() { PAssert.that(output).containsInAnyOrder("abc", "newj", "newj", "newj", "def"); p.run(); } - + @Test public void testReplaceFirst() { TestPipeline p = TestPipeline.create(); @@ -187,7 +187,7 @@ public void testReplaceFirst() { PAssert.that(output).containsInAnyOrder("newjx", "newjy", "newjz"); p.run(); } - + @Test public void testReplaceFirstMixed() { TestPipeline p = TestPipeline.create(); From c1a3bc55b47e5dbd11c2bcb360a2a7f2c4aacfb9 Mon Sep 17 00:00:00 2001 From: Jesse Anderson Date: Mon, 16 May 2016 13:58:08 -0700 Subject: [PATCH 05/24] Changed Word Counts to use TypeDescriptors. --- .../org/apache/beam/examples/MinimalWordCountJava8.java | 6 +++--- .../org/apache/beam/examples/MinimalWordCountJava8Test.java | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java index 398d517baa54..d4917418402d 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.transforms.FlatMapElements; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; import java.util.Arrays; @@ -54,12 +54,12 @@ public static void main(String[] args) { p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*")) .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))) - .withOutputType(new TypeDescriptor() {})) + .withOutputType(TypeDescriptors.strings())) .apply(Filter.byPredicate((String word) -> !word.isEmpty())) .apply(Count.perElement()) .apply(MapElements .via((KV wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()) - .withOutputType(new TypeDescriptor() {})) + .withOutputType(TypeDescriptors.strings())) // CHANGE 3 of 3: The Google Cloud Storage path is required for outputting the results to. .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX")); diff --git a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java index ae925592504e..f73250fd386a 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java @@ -28,7 +28,7 @@ import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; import com.google.common.collect.ImmutableList; @@ -65,12 +65,12 @@ public void testMinimalWordCountJava8() throws Exception { p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*")) .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))) - .withOutputType(new TypeDescriptor() {})) + .withOutputType(TypeDescriptors.strings())) .apply(Filter.byPredicate((String word) -> !word.isEmpty())) .apply(Count.perElement()) .apply(MapElements .via((KV wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()) - .withOutputType(new TypeDescriptor() {})) + .withOutputType(TypeDescriptors.strings())) .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX")); } From adfeb01bafb1c17c6c65b30c45ea3e42473a80e6 Mon Sep 17 00:00:00 2001 From: Jesse Anderson Date: Mon, 16 May 2016 14:09:18 -0700 Subject: [PATCH 06/24] Updated complete examples to use TypeDescriptors. --- .../org/apache/beam/examples/complete/game/GameStats.java | 5 +++-- .../org/apache/beam/examples/complete/game/UserScore.java | 5 +++-- .../beam/examples/complete/game/HourlyTeamScoreTest.java | 5 +++-- .../apache/beam/examples/complete/game/UserScoreTest.java | 5 +++-- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java index 7814eb196c21..c5579134850a 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java @@ -44,7 +44,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; import org.joda.time.DateTimeZone; import org.joda.time.Duration; @@ -255,7 +255,8 @@ public static void main(String[] args) throws Exception { PCollection> userEvents = rawEvents.apply("ExtractUserScore", MapElements.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())) - .withOutputType(new TypeDescriptor>() {})); + .withOutputType( + TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))); // Calculate the total score per user over fixed windows, and // cumulative updates for late data. diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java index 866adefdb886..de049e83ea13 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java @@ -35,7 +35,7 @@ import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.avro.reflect.Nullable; import org.slf4j.Logger; @@ -168,7 +168,8 @@ public PCollection> apply( return gameInfo .apply(MapElements .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore())) - .withOutputType(new TypeDescriptor>() {})) + .withOutputType( + TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))) .apply(Sum.integersPerKey()); } } diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java index aa11c6c34644..5ff615ad93b5 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java @@ -30,7 +30,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; import org.joda.time.Instant; import org.junit.Test; @@ -102,7 +102,8 @@ public void testUserScoresFilter() throws Exception { // run a map to access the fields in the result. .apply(MapElements .via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())) - .withOutputType(new TypeDescriptor>() {})); + .withOutputType( + TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))); PAssert.that(output).containsInAnyOrder(FILTERED_EVENTS); diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java index 40740a6bc2a3..b38e4ecf324c 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java @@ -31,7 +31,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; import org.junit.Assert; import org.junit.Test; @@ -146,7 +146,8 @@ public void testUserScoresBadInput() throws Exception { .apply(ParDo.of(new ParseEventFn())) .apply( MapElements.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())) - .withOutputType(new TypeDescriptor>() {})); + .withOutputType( + TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))); PAssert.that(extract).empty(); From 2a455f3ae029e30644cc7b6c96105f761f903e74 Mon Sep 17 00:00:00 2001 From: Jesse Anderson Date: Mon, 16 May 2016 15:11:57 -0700 Subject: [PATCH 07/24] Removing Regex transforms from this branch. --- .../beam/sdk/transforms/RegexTransform.java | 428 ------------------ .../sdk/transforms/RegexTransformTest.java | 202 --------- 2 files changed, 630 deletions(-) delete mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java delete mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java deleted file mode 100644 index 99a2f708ec61..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java +++ /dev/null @@ -1,428 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * {@code PTransorm}s to use Regular Expressions to process elements in a - * {@link PCollection}. - * - *

- * {@link RegexTransform#matches(String, int)} can be used to see if an entire line matches - * a Regex. {@link RegexTransform#matchesKV(String, int, int)} can be used to see if an entire - * line matches a Regex and output certain groups as a {@link KV}. - *

- *

- * {@link RegexTransform#find(String, int)} can be used to see if a portion of a line - * matches a Regex. {@link RegexTransform#matchesKV(String, int, int)} can be used to see if a - * portion of a line matches a Regex and output certain groups as a {@link KV}. - *

- *

- * Lines that do not match the Regex will not be output. - *

- */ -public class RegexTransform { - private RegexTransform() { - // do not instantiate - } - - /** - * Returns a {@link RegexTransform.Matches} {@link PTransform} that checks if - * the entire line matches the Regex. Returns the entire line (group 0) as a - * {@link PCollection}. - * @param regex - * The regular expression to run - */ - public static Matches matches(String regex) { - return matches(regex, 0); - } - - /** - * Returns a {@link RegexTransform.Matches} {@link PTransform} that checks if - * the entire line matches the Regex. Returns the group as a - * {@link PCollection}. - * @param regex - * The regular expression to run - * @param group - * The Regex group to return as a PCollection - */ - public static Matches matches(String regex, int group) { - return new Matches(regex, group); - } - - /** - * Returns a {@link RegexTransform.MatchesKV} {@link PTransform} that checks - * if the entire line matches the Regex. Returns the specified groups as the - * key and value as a {@link PCollection}. - * @param regex - * The regular expression to run - * @param keyGroup - * The Regex group to use as the key - * @param valueGroup - * The Regex group to use the value - */ - public static MatchesKV matchesKV(String regex, int keyGroup, - int valueGroup) { - return new MatchesKV(regex, keyGroup, valueGroup); - } - - /** - * Returns a {@link RegexTransform.Find} {@link PTransform} that checks if a - * portion of the line matches the Regex. Returns the entire line (group 0) as - * a {@link PCollection}. - * @param regex - * The regular expression to run - */ - public static Find find(String regex) { - return find(regex, 0); - } - - /** - * Returns a {@link RegexTransform.Find} {@link PTransform} that checks if a - * portion of the line matches the Regex. Returns the group as a - * {@link PCollection}. - * @param regex - * The regular expression to run - * @param group - * The Regex group to return as a PCollection - */ - public static Find find(String regex, int group) { - return new Find(regex, group); - } - - /** - * Returns a {@link RegexTransform.FindKV} {@link PTransform} that checks if a - * portion of the line matches the Regex. Returns the specified groups as the - * key and value as a {@link PCollection}. - * @param regex - * The regular expression to run - * @param keyGroup - * The Regex group to use as the key - * @param valueGroup - * The Regex group to use the value - */ - public static FindKV findKV(String regex, int keyGroup, int valueGroup) { - return new FindKV(regex, keyGroup, valueGroup); - } - - /** - * Returns a {@link RegexTransform.ReplaceAll} {@link PTransform} that checks if a - * portion of the line matches the Regex and replaces all matches with the replacement - * String. Returns the group as a {@link PCollection}. - * @param regex - * The regular expression to run - * @param replacement - * The string to be substituted for each match - */ - public static ReplaceAll replaceAll(String regex, String replacement) { - return new ReplaceAll(regex, replacement); - } - - /** - * Returns a {@link RegexTransform.ReplaceAll} {@link PTransform} that checks if a - * portion of the line matches the Regex and replaces the first match with the replacement - * String. Returns the group as a {@link PCollection}. - * @param regex - * The regular expression to run - * @param replacement - * The string to be substituted for each match - */ - public static ReplaceFirst replaceFirst(String regex, String replacement) { - return new ReplaceFirst(regex, replacement); - } - - /** - * {@code RegexTransform.Matches} takes a {@code PCollection} - * and returns a {@code PCollection} representing the value - * extracted from the Regex groups of the input {@code PCollection} - * to the number of times that element occurs in the input. - * - *

- * This transform runs a Regex on the entire input line. If the entire line - * does not match the Regex, the line will not be output. If it does match the - * entire line, the group in the Regex will be used. The output will be the - * Regex group. - * - *

- * Example of use: - *

-   *  {@code
-   * PCollection words = ...;
-   * PCollection values =
-   *     words.apply(RegexTransform.matches("myregex (mygroup)", 1));
-   * }
-   * 
- */ - public static class Matches - extends PTransform, PCollection> { - Pattern pattern; - int group; - - public Matches(String regex, int group) { - this.pattern = Pattern.compile(regex); - this.group = group; - } - - public PCollection apply(PCollection in) { - return in - .apply(ParDo.named("MatchesRegex").of(new DoFn() { - @Override - public void processElement(ProcessContext c) throws Exception { - Matcher m = pattern.matcher((String) c.element()); - - if (m.matches()) { - c.output(m.group(group)); - } - } - })); - } - } - - /** - * {@code RegexTransform.MatchesKV>} takes a - * {@code PCollection} and returns a - * {@code PCollection>} representing the key and value - * extracted from the Regex groups of the input {@code PCollection} to the - * number of times that element occurs in the input. - * - *

- * This transform runs a Regex on the entire input line. If the entire line - * does not match the Regex, the line will not be output. If it does match the - * entire line, the groups in the Regex will be used. The key will be the - * key's group and the value will be the value's group. - * - *

- * Example of use: - *

-   *  {@code
-   * PCollection words = ...;
-   * PCollection> keysAndValues =
-   *     words.apply(RegexTransform.matchesKV("myregex (mykeygroup) (myvaluegroup)", 1, 2));
-   * }
-   * 
- */ - public static class MatchesKV - extends PTransform, PCollection>> { - Pattern pattern; - int keyGroup, valueGroup; - - public MatchesKV(String regex, int keyGroup, int valueGroup) { - this.pattern = Pattern.compile(regex); - this.keyGroup = keyGroup; - this.valueGroup = valueGroup; - } - - public PCollection> apply(PCollection in) { - return in.apply(ParDo.named("MatchesKVRegex") - .of(new DoFn>() { - @Override - public void processElement(ProcessContext c) throws Exception { - Matcher m = pattern.matcher((String) c.element()); - - if (m.find()) { - c.output(KV.of(m.group(keyGroup), m.group(valueGroup))); - } - } - })); - } - } - - /** - * {@code RegexTransform.Find} takes a {@code PCollection} and - * returns a {@code PCollection} representing the value extracted - * from the Regex groups of the input {@code PCollection} to - * the number of times that element occurs in the input. - * - *

- * This transform runs a Regex on the entire input line. If a portion of the - * line does not match the Regex, the line will not be output. If it does - * match a portion of the line, the group in the Regex will be used. The - * output will be the Regex group. - * - *

- * Example of use: - *

-   *  {@code
-   * PCollection words = ...;
-   * PCollection values =
-   *     words.apply(RegexTransform.find("myregex (mygroup)", 1));
-   * }
-   * 
- */ - public static class Find - extends PTransform, PCollection> { - Pattern pattern; - int group; - - public Find(String regex, int group) { - this.pattern = Pattern.compile(regex); - this.group = group; - } - - public PCollection apply(PCollection in) { - return in.apply(ParDo.named("FindRegex").of(new DoFn() { - @Override - public void processElement(ProcessContext c) throws Exception { - Matcher m = pattern.matcher((String) c.element()); - - if (m.find()) { - c.output(m.group(group)); - } - } - })); - } - } - - /** - * {@code RegexTransform.MatchesKV>} takes a - * {@code PCollection} and returns a - * {@code PCollection>} representing the key and value - * extracted from the Regex groups of the input {@code PCollection} to the - * number of times that element occurs in the input. - * - *

- * This transform runs a Regex on the entire input line. If a portion of the - * line does not match the Regex, the line will not be output. If it does - * match a portion of the line, the groups in the Regex will be used. The key - * will be the key's group and the value will be the value's group. - * - *

- * Example of use: - *

-   *  {@code
-   * PCollection words = ...;
-   * PCollection> keysAndValues =
-   *     words.apply(RegexTransform.findKV("myregex (mykeygroup) (myvaluegroup)", 1, 2));
-   * }
-   * 
- */ - public static class FindKV - extends PTransform, PCollection>> { - Pattern pattern; - int keyGroup, valueGroup; - - public FindKV(String regex, int keyGroup, int valueGroup) { - this.pattern = Pattern.compile(regex); - this.keyGroup = keyGroup; - this.valueGroup = valueGroup; - } - - public PCollection> apply(PCollection in) { - return in.apply( - ParDo.named("FindKVRegex").of(new DoFn>() { - @Override - public void processElement(ProcessContext c) throws Exception { - Matcher m = pattern.matcher((String) c.element()); - - if (m.find()) { - c.output(KV.of(m.group(keyGroup), m.group(valueGroup))); - } - } - })); - } - } - - /** - * {@code RegexTransform.ReplaceAll} takes a {@code PCollection} and - * returns a {@code PCollection} with all Strings that matched the - * Regex being replaced with the replacement string. - * - *

- * This transform runs a Regex on the entire input line. If a portion of the - * line does not match the Regex, the line will be output without changes. If it does - * match a portion of the line, all portions matching the Regex will be replaced - * with the replacement String. - * - *

- * Example of use: - *

-   *  {@code
-   * PCollection words = ...;
-   * PCollection values =
-   *     words.apply(RegexTransform.replaceAll("myregex", "myreplacement"));
-   * }
-   * 
- */ - public static class ReplaceAll - extends PTransform, PCollection> { - Pattern pattern; - String replacement; - - public ReplaceAll(String regex, String replacement) { - this.pattern = Pattern.compile(regex); - this.replacement = replacement; - } - - public PCollection apply(PCollection in) { - return in.apply(ParDo.named("ReplaceAllRegex").of(new DoFn() { - @Override - public void processElement(ProcessContext c) throws Exception { - Matcher m = pattern.matcher((String) c.element()); - c.output(m.replaceAll(replacement)); - } - })); - } - } - - /** - * {@code RegexTransform.ReplaceFirst} takes a {@code PCollection} and - * returns a {@code PCollection} with the first Strings that matched the - * Regex being replaced with the replacement string. - * - *

- * This transform runs a Regex on the entire input line. If a portion of the - * line does not match the Regex, the line will be output without changes. If it does - * match a portion of the line, the first portion matching the Regex will be replaced - * with the replacement String. - * - *

- * Example of use: - *

-   *  {@code
-   * PCollection words = ...;
-   * PCollection values =
-   *     words.apply(RegexTransform.replaceFirst("myregex", "myreplacement"));
-   * }
-   * 
- */ - public static class ReplaceFirst - extends PTransform, PCollection> { - Pattern pattern; - String replacement; - - public ReplaceFirst(String regex, String replacement) { - this.pattern = Pattern.compile(regex); - this.replacement = replacement; - } - - public PCollection apply(PCollection in) { - return in.apply(ParDo.named("ReplaceAllRegex").of(new DoFn() { - @Override - public void processElement(ProcessContext c) throws Exception { - Matcher m = pattern.matcher((String) c.element()); - c.output(m.replaceFirst(replacement)); - } - })); - } - } -} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java deleted file mode 100644 index 393a34da8c7d..000000000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.Serializable; - -/** - * Tests for {@link RegexTransform}. - */ -@RunWith(JUnit4.class) -public class RegexTransformTest implements Serializable { - @Test - public void testFind() { - TestPipeline p = TestPipeline.create(); - - PCollection output = p - .apply(Create.of("aj", "xj", "yj", "zj")) - .apply(RegexTransform.find("[xyz]")); - - PAssert.that(output).containsInAnyOrder("x", "y", "z"); - p.run(); - } - - @Test - public void testFindGroup() { - TestPipeline p = TestPipeline.create(); - - PCollection output = p - .apply(Create.of("aj", "xj", "yj", "zj")) - .apply(RegexTransform.find("([xyz])", 1)); - - PAssert.that(output).containsInAnyOrder("x", "y", "z"); - p.run(); - } - - @Test - public void testFindNone() { - TestPipeline p = TestPipeline.create(); - - PCollection output = p - .apply(Create.of("a", "b", "c", "d")) - .apply(RegexTransform.find("[xyz]")); - - PAssert.that(output).empty(); - p.run(); - } - - @Test - public void testKVFind() { - TestPipeline p = TestPipeline.create(); - - PCollection> output = p - .apply(Create.of("a b c")) - .apply(RegexTransform.findKV("a (b) (c)", 1, 2)); - - PAssert.that(output).containsInAnyOrder(KV.of("b", "c")); - p.run(); - } - - @Test - public void testKVFindNone() { - TestPipeline p = TestPipeline.create(); - - PCollection> output = p - .apply(Create.of("x y z")) - .apply(RegexTransform.findKV("a (b) (c)", 1, 2)); - - PAssert.that(output).empty(); - p.run(); - } - - @Test - public void testMatches() { - TestPipeline p = TestPipeline.create(); - - PCollection output = p - .apply(Create.of("a", "x", "y", "z")) - .apply(RegexTransform.matches("[xyz]")); - - PAssert.that(output).containsInAnyOrder("x", "y", "z"); - p.run(); - } - - @Test - public void testMatchesNone() { - TestPipeline p = TestPipeline.create(); - - PCollection output = p - .apply(Create.of("a", "b", "c", "d")) - .apply(RegexTransform.matches("[xyz]")); - - PAssert.that(output).empty(); - p.run(); - } - - @Test - public void testMatchesGroup() { - TestPipeline p = TestPipeline.create(); - - PCollection output = p - .apply(Create.of("a", "x xxx", "x yyy", "x zzz")) - .apply(RegexTransform.matches("x ([xyz]*)", 1)); - - PAssert.that(output).containsInAnyOrder("xxx", "yyy", "zzz"); - p.run(); - } - - @Test - public void testKVMatches() { - TestPipeline p = TestPipeline.create(); - - PCollection> output = p - .apply(Create.of("a b c")) - .apply(RegexTransform.matchesKV("a (b) (c)", 1, 2)); - - PAssert.that(output).containsInAnyOrder(KV.of("b", "c")); - p.run(); - } - - @Test - public void testKVMatchesNone() { - TestPipeline p = TestPipeline.create(); - - PCollection> output = p - .apply(Create.of("x y z")) - .apply(RegexTransform.matchesKV("a (b) (c)", 1, 2)); - PAssert.that(output).empty(); - p.run(); - } - - @Test - public void testReplaceAll() { - TestPipeline p = TestPipeline.create(); - - PCollection output = p - .apply(Create.of("xj", "yj", "zj")) - .apply(RegexTransform.replaceAll("[xyz]", "new")); - - PAssert.that(output).containsInAnyOrder("newj", "newj", "newj"); - p.run(); - } - - @Test - public void testReplaceAllMixed() { - TestPipeline p = TestPipeline.create(); - - PCollection output = p - .apply(Create.of("abc", "xj", "yj", "zj", "def")) - .apply(RegexTransform.replaceAll("[xyz]", "new")); - - PAssert.that(output).containsInAnyOrder("abc", "newj", "newj", "newj", "def"); - p.run(); - } - - @Test - public void testReplaceFirst() { - TestPipeline p = TestPipeline.create(); - - PCollection output = p - .apply(Create.of("xjx", "yjy", "zjz")) - .apply(RegexTransform.replaceFirst("[xyz]", "new")); - - PAssert.that(output).containsInAnyOrder("newjx", "newjy", "newjz"); - p.run(); - } - - @Test - public void testReplaceFirstMixed() { - TestPipeline p = TestPipeline.create(); - - PCollection output = p - .apply(Create.of("abc", "xjx", "yjy", "zjz", "def")) - .apply(RegexTransform.replaceFirst("[xyz]", "new")); - - PAssert.that(output).containsInAnyOrder("abc", "newjx", "newjy", "newjz", "def"); - p.run(); - } -} From 8ed14ca1986d45d49a43ac43a7fdf47a377103aa Mon Sep 17 00:00:00 2001 From: Jesse Anderson Date: Mon, 23 May 2016 12:08:12 -0700 Subject: [PATCH 08/24] Trivial change to kick off another build. --- .../java/org/apache/beam/examples/MinimalWordCountJava8.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java index d4917418402d..00fe7302f6b6 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java @@ -52,7 +52,7 @@ public static void main(String[] args) { Pipeline p = Pipeline.create(options); - p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*")) + p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/* ")) .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))) .withOutputType(TypeDescriptors.strings())) .apply(Filter.byPredicate((String word) -> !word.isEmpty())) From f9cd71989cf23d12ba0462c38d2e212a54902a4b Mon Sep 17 00:00:00 2001 From: Jesse Anderson Date: Mon, 23 May 2016 12:08:34 -0700 Subject: [PATCH 09/24] Trivial change to kick off another build. --- .../java/org/apache/beam/examples/MinimalWordCountJava8.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java index 00fe7302f6b6..d4917418402d 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java @@ -52,7 +52,7 @@ public static void main(String[] args) { Pipeline p = Pipeline.create(options); - p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/* ")) + p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*")) .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))) .withOutputType(TypeDescriptors.strings())) .apply(Filter.byPredicate((String word) -> !word.isEmpty())) From 8a67ca013d32eb9486b9c30ea0d2df68d096e419 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Mon, 23 May 2016 13:20:36 -0700 Subject: [PATCH 10/24] KafkaIO: pin to current working version We have started picking up snapshot versions of the next version of Kafka, which cause build breaks. Pin to 0.9.0.1 which is the version we've been building with successfully. --- sdks/java/io/kafka/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml index bb9baaa1f178..fdc4d6d3833a 100644 --- a/sdks/java/io/kafka/pom.xml +++ b/sdks/java/io/kafka/pom.xml @@ -61,7 +61,7 @@ org.apache.kafka kafka-clients - [0.9,) + 0.9.0.1 From 6a74143a4df8e03e71cc0197653d52407fee1d2f Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Fri, 20 May 2016 21:22:08 -0700 Subject: [PATCH 11/24] PubsubIO: make translation to Dataflow service compatible --- .../java/org/apache/beam/sdk/io/PubsubIO.java | 11 ++++--- .../beam/sdk/io/PubsubUnboundedSink.java | 7 +---- .../apache/beam/sdk/util/MovingFunction.java | 8 +---- .../apache/beam/sdk/util/PubsubClient.java | 2 +- ...piaryClient.java => PubsubJsonClient.java} | 31 +++++++++++++------ .../sdk/io/PubsubUnboundedSourceTest.java | 2 -- ...entTest.java => PubsubJsonClientTest.java} | 6 ++-- 7 files changed, 33 insertions(+), 34 deletions(-) rename sdks/java/core/src/main/java/org/apache/beam/sdk/util/{PubsubApiaryClient.java => PubsubJsonClient.java} (91%) rename sdks/java/core/src/test/java/org/apache/beam/sdk/util/{PubsubApiaryClientTest.java => PubsubJsonClientTest.java} (97%) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java index 23a11401dedb..77c0b35ad81b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java @@ -32,13 +32,13 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.PubsubApiaryClient; import org.apache.beam.sdk.util.PubsubClient; import org.apache.beam.sdk.util.PubsubClient.IncomingMessage; import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.util.PubsubClient.ProjectPath; import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.util.PubsubClient.TopicPath; +import org.apache.beam.sdk.util.PubsubJsonClient; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.PInput; @@ -71,7 +71,7 @@ public class PubsubIO { private static final Logger LOG = LoggerFactory.getLogger(PubsubIO.class); /** Factory for creating pubsub client to manage transport. */ - private static final PubsubClient.PubsubClientFactory FACTORY = PubsubApiaryClient.FACTORY; + private static final PubsubClient.PubsubClientFactory FACTORY = PubsubJsonClient.FACTORY; /** The default {@link Coder} used to translate to/from Cloud Pub/Sub messages. */ public static final Coder DEFAULT_PUBSUB_CODER = StringUtf8Coder.of(); @@ -646,7 +646,8 @@ public PCollection apply(PInput input) { if (boundedOutput) { return input.getPipeline().begin() .apply(Create.of((Void) null)).setCoder(VoidCoder.of()) - .apply(ParDo.of(new PubsubBoundedReader())).setCoder(coder); + .apply(ParDo.of(new PubsubBoundedReader())) + .setCoder(coder); } else { @Nullable ProjectPath projectPath = topic == null ? null : PubsubClient.projectPathFromId(topic.project); @@ -655,8 +656,8 @@ public PCollection apply(PInput input) { @Nullable SubscriptionPath subscriptionPath = subscription == null ? null - : PubsubClient - .subscriptionPathFromName(subscription.project, subscription.subscription); + : PubsubClient.subscriptionPathFromName( + subscription.project, subscription.subscription); return input.getPipeline().begin() .apply(new PubsubUnboundedSource( FACTORY, projectPath, topicPath, subscriptionPath, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java index 6ff9b40d39d0..a165c9188a23 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java @@ -57,8 +57,6 @@ import com.google.common.hash.Hashing; import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; @@ -89,8 +87,6 @@ *

NOTE: This is not the implementation used when running on the Google Cloud Dataflow service. */ public class PubsubUnboundedSink extends PTransform, PDone> { - private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSink.class); - /** * Default maximum number of messages per publish. */ @@ -249,9 +245,8 @@ private static class WriterFn */ private void publishBatch(List messages, int bytes) throws IOException { - long nowMsSinceEpoch = System.currentTimeMillis(); int n = pubsubClient.publish(topic, messages); - checkState(n == messages.size(), "Attempted to publish %d messages but %d were successful", + checkState(n == messages.size(), "Attempted to publish %s messages but %s were successful", messages.size(), n); batchCounter.addValue(1L); elementCounter.addValue((long) messages.size()); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java index 84ba8b8437de..96802ae27822 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java @@ -29,11 +29,6 @@ * {@link #sampleUpdateMs}. */ public class MovingFunction { - /** - * How far back to retain samples, in ms. - */ - private final long samplePeriodMs; - /** * How frequently to update the moving function, in ms. */ @@ -77,7 +72,6 @@ public class MovingFunction { public MovingFunction(long samplePeriodMs, long sampleUpdateMs, int numSignificantBuckets, int numSignificantSamples, Combine.BinaryCombineLongFn function) { - this.samplePeriodMs = samplePeriodMs; this.sampleUpdateMs = sampleUpdateMs; this.numSignificantBuckets = numSignificantBuckets; this.numSignificantSamples = numSignificantSamples; @@ -123,7 +117,7 @@ public void add(long nowMsSinceEpoch, long value) { } /** - * Return the minimum/maximum/sum of all retained values within {@link #samplePeriodMs} + * Return the minimum/maximum/sum of all retained values within samplePeriodMs * of {@code nowMsSinceEpoch}. */ public long get(long nowMsSinceEpoch) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java index 07ce97df13bd..76bf03fc5ad1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java @@ -57,7 +57,7 @@ PubsubClient newClient( PubsubOptions options) throws IOException; /** - * Return the display name for this factory. Eg "Apiary", "gRPC". + * Return the display name for this factory. Eg "Json", "gRPC". */ String getKind(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java similarity index 91% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java index 08981d01212e..69c5128df8e0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubApiaryClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java @@ -22,6 +22,8 @@ import org.apache.beam.sdk.options.PubsubOptions; +import com.google.api.client.auth.oauth2.Credential; +import com.google.api.client.http.HttpRequestInitializer; import com.google.api.services.pubsub.Pubsub; import com.google.api.services.pubsub.Pubsub.Builder; import com.google.api.services.pubsub.model.AcknowledgeRequest; @@ -50,11 +52,20 @@ import javax.annotation.Nullable; /** - * A Pubsub client using Apiary. + * A Pubsub client using JSON transport. */ -public class PubsubApiaryClient extends PubsubClient { +public class PubsubJsonClient extends PubsubClient { + + private static class PubsubJsonClientFactory implements PubsubClientFactory { + private static HttpRequestInitializer chainHttpRequestInitializer( + Credential credential, HttpRequestInitializer httpRequestInitializer) { + if (credential == null) { + return httpRequestInitializer; + } else { + return new ChainingHttpRequestInitializer(credential, httpRequestInitializer); + } + } - private static class PubsubApiaryClientFactory implements PubsubClientFactory { @Override public PubsubClient newClient( @Nullable String timestampLabel, @Nullable String idLabel, PubsubOptions options) @@ -62,7 +73,7 @@ public PubsubClient newClient( Pubsub pubsub = new Builder( Transport.getTransport(), Transport.getJsonFactory(), - new ChainingHttpRequestInitializer( + chainHttpRequestInitializer( options.getGcpCredential(), // Do not log 404. It clutters the output and is possibly even required by the caller. new RetryHttpRequestInitializer(ImmutableList.of(404)))) @@ -70,19 +81,19 @@ public PubsubClient newClient( .setApplicationName(options.getAppName()) .setGoogleClientRequestInitializer(options.getGoogleApiTrace()) .build(); - return new PubsubApiaryClient(timestampLabel, idLabel, pubsub); + return new PubsubJsonClient(timestampLabel, idLabel, pubsub); } @Override public String getKind() { - return "Apiary"; + return "Json"; } } /** - * Factory for creating Pubsub clients using Apiary transport. + * Factory for creating Pubsub clients using Json transport. */ - public static final PubsubClientFactory FACTORY = new PubsubApiaryClientFactory(); + public static final PubsubClientFactory FACTORY = new PubsubJsonClientFactory(); /** * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time @@ -98,12 +109,12 @@ public String getKind() { private final String idLabel; /** - * Underlying Apiary client. + * Underlying JSON transport. */ private Pubsub pubsub; @VisibleForTesting - PubsubApiaryClient( + PubsubJsonClient( @Nullable String timestampLabel, @Nullable String idLabel, Pubsub pubsub) { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java index 3b0a1c8c00f2..a19ccc5197e3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java @@ -119,7 +119,6 @@ public void readOneMessage() throws IOException { setupOneMessage(); TestPipeline p = TestPipeline.create(); PubsubReader reader = primSource.createReader(p.getOptions(), null); - PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); // Read one message. assertTrue(reader.start()); assertEquals(DATA, reader.getCurrent()); @@ -216,7 +215,6 @@ public void multipleReaders() throws IOException { setupOneMessage(incoming); TestPipeline p = TestPipeline.create(); PubsubReader reader = primSource.createReader(p.getOptions(), null); - PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); // Consume two messages, only read one. assertTrue(reader.start()); assertEquals("data_0", reader.getCurrent()); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java similarity index 97% rename from sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java rename to sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java index 0f3a7bb506dc..dfdc46ecea68 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubApiaryClientTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubJsonClientTest.java @@ -44,9 +44,9 @@ import java.util.List; /** - * Tests for PubsubApiaryClient. + * Tests for PubsubJsonClient. */ -public class PubsubApiaryClientTest { +public class PubsubJsonClientTest { private Pubsub mockPubsub; private PubsubClient client; @@ -66,7 +66,7 @@ public class PubsubApiaryClientTest { @Before public void setup() throws IOException { mockPubsub = Mockito.mock(Pubsub.class, Mockito.RETURNS_DEEP_STUBS); - client = new PubsubApiaryClient(TIMESTAMP_LABEL, ID_LABEL, mockPubsub); + client = new PubsubJsonClient(TIMESTAMP_LABEL, ID_LABEL, mockPubsub); } @After From 269af8d53db5ab5579f1c647f58552ea636bcec8 Mon Sep 17 00:00:00 2001 From: Davor Bonaci Date: Mon, 23 May 2016 14:57:40 -0700 Subject: [PATCH 12/24] Update Flink runner's pom.xml Removing an extra '-' in the comment XML prefix "!<---". This is not a valid prefix and breaks Maven release plugin. --- runners/flink/runner/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml index b29a5bf221c0..cd35a4d4d67c 100644 --- a/runners/flink/runner/pom.xml +++ b/runners/flink/runner/pom.xml @@ -63,7 +63,7 @@ ${flink.version} - + org.apache.beam java-sdk-all From 6e97b11cb0ac443d92209d997ace315f493df6cf Mon Sep 17 00:00:00 2001 From: Davor Bonaci Date: Mon, 23 May 2016 15:21:00 -0700 Subject: [PATCH 13/24] Update Flink runner's pom.xml Explicitly disble Javadoc generation phase. This phase runs during the release process, but the current codebase doesn't compile Javadoc. This should be fixed and removed as soon as possible. --- runners/flink/examples/pom.xml | 13 +++++++++++++ runners/flink/runner/pom.xml | 13 +++++++++++++ 2 files changed, 26 insertions(+) diff --git a/runners/flink/examples/pom.xml b/runners/flink/examples/pom.xml index e5bab3e9c03e..6a624b2581f0 100644 --- a/runners/flink/examples/pom.xml +++ b/runners/flink/examples/pom.xml @@ -86,6 +86,19 @@ --> + + + org.apache.maven.plugins + maven-javadoc-plugin + + + attach-javadocs + + + + + org.codehaus.mojo exec-maven-plugin diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml index cd35a4d4d67c..90209767de94 100644 --- a/runners/flink/runner/pom.xml +++ b/runners/flink/runner/pom.xml @@ -159,6 +159,19 @@ --> + + + org.apache.maven.plugins + maven-javadoc-plugin + + + attach-javadocs + + + + + org.apache.maven.plugins From 264ff74beafd658d7165aa45901adc86f4dae8f4 Mon Sep 17 00:00:00 2001 From: Davor Bonaci Date: Mon, 23 May 2016 17:12:28 -0700 Subject: [PATCH 14/24] Update Java 8 examples' pom.xml file Disable Javadoc execution, which runs in the release profile, because it is currently broken and blocks release. --- examples/java8/pom.xml | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/examples/java8/pom.xml b/examples/java8/pom.xml index e211739a9412..d4c91a550077 100644 --- a/examples/java8/pom.xml +++ b/examples/java8/pom.xml @@ -39,6 +39,19 @@ + + + org.apache.maven.plugins + maven-javadoc-plugin + + + attach-javadocs + + + + + maven-compiler-plugin From ed8b1d5da523fea3e0c13de071e5d18a29a2121f Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Mon, 23 May 2016 18:59:48 -0700 Subject: [PATCH 15/24] TypeDescriptor: remove bogus import Misuse of Javadoc caused a bad import. --- .../main/java/org/apache/beam/sdk/values/TypeDescriptor.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java index 227c32fdee19..eea720aa2015 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptor.java @@ -18,14 +18,11 @@ package org.apache.beam.sdk.values; import com.google.common.collect.Lists; - import com.google.common.reflect.Invokable; import com.google.common.reflect.Parameter; import com.google.common.reflect.TypeParameter; import com.google.common.reflect.TypeToken; -import org.apache.velocity.runtime.parser.Token; - import java.io.Serializable; import java.lang.reflect.Field; import java.lang.reflect.Method; @@ -176,7 +173,7 @@ public static TypeDescriptor of(Type type) { } /** - * Creates a new {@link SimpleTypeDescriptor} using the {@link Token}. + * Creates a new {@link SimpleTypeDescriptor} using the {@link #token}. * Package visible so this isn't abused. */ TypeDescriptor where(TypeParameter typeParam, TypeDescriptor typeDescriptor) { From fae3af03e3ae30ecc90f015833daab4d2e8868ef Mon Sep 17 00:00:00 2001 From: Jesse Anderson Date: Mon, 2 May 2016 11:39:26 -0700 Subject: [PATCH 16/24] Make Regex Transform --- .../beam/sdk/transforms/RegexTransform.java | 316 ++++++++++++++++++ .../sdk/transforms/RegexTransformTest.java | 155 +++++++++ 2 files changed, 471 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java new file mode 100644 index 000000000000..027967c4e4ae --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java @@ -0,0 +1,316 @@ +package org.apache.beam.sdk.transforms; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +/** + * {@code PTransorm}s to use Regular Expressions to process elements in a + * {@link PCollection}. + * + *

+ * {@link RegexTransform#matches()} can be used to see if an entire line matches + * a Regex. {@link RegexTransform#matchesKV()} can be used to see if an entire + * line matches a Regex and output certain groups as a {@link KV}. + *

+ * + *

+ * {@link RegexTransform#find()} can be used to see if a portion of a line + * matches a Regex. {@link RegexTransform#matchesKV()} can be used to see if a + * portion of a line matches a Regex and output certain groups as a {@link KV}. + *

+ * + *

+ * Lines that do not match the Regex will not be output. + *

+ */ +public class RegexTransform { + private RegexTransform() { + // do not instantiate + } + + /** + * Returns a {@link RegexTransform.Matches} {@link PTransform} that checks if + * the entire line matches the Regex. Returns the entire line (group 0) as a + * {@link PCollection}. + * + * @param regex + * The regular expression to run + */ + public static Matches matches(String regex) { + return matches(regex, 0); + } + + /** + * Returns a {@link RegexTransform.Matches} {@link PTransform} that checks if + * the entire line matches the Regex. Returns the group as a + * {@link PCollection}. + * + * @param regex + * The regular expression to run + * @param group + * The Regex group to return as a PCollection + */ + public static Matches matches(String regex, int group) { + return new Matches(regex, group); + } + + /** + * Returns a {@link RegexTransform.MatchesKV} {@link PTransform} that checks + * if the entire line matches the Regex. Returns the specified groups as the + * key and value as a {@link PCollection}. + * + * @param regex + * The regular expression to run + * @param keyGroup + * The Regex group to use as the key + * @param valueGroup + * The Regex group to use the value + */ + public static MatchesKV matchesKV(String regex, int keyGroup, + int valueGroup) { + return new MatchesKV(regex, keyGroup, valueGroup); + } + + /** + * Returns a {@link RegexTransform.Find} {@link PTransform} that checks if a + * portion of the line matches the Regex. Returns the entire line (group 0) as + * a {@link PCollection}. + * + * @param regex + * The regular expression to run + */ + public static Find find(String regex) { + return find(regex, 0); + } + + /** + * Returns a {@link RegexTransform.Find} {@link PTransform} that checks if a + * portion of the line matches the Regex. Returns the group as a + * {@link PCollection}. + * + * @param regex + * The regular expression to run + * @param group + * The Regex group to return as a PCollection + */ + public static Find find(String regex, int group) { + return new Find(regex, group); + } + + /** + * Returns a {@link RegexTransform.FindKV} {@link PTransform} that checks if a + * portion of the line matches the Regex. Returns the specified groups as the + * key and value as a {@link PCollection}. + * + * @param regex + * The regular expression to run + * @param keyGroup + * The Regex group to use as the key + * @param valueGroup + * The Regex group to use the value + */ + public static FindKV findKV(String regex, int keyGroup, int valueGroup) { + return new FindKV(regex, keyGroup, valueGroup); + } + + /** + * {@code RegexTransform.Matches} takes a {@code PCollection} + * and returns a {@code PCollection>} representing the key + * and value extracted from the Regex groups of the input {@code PCollection} + * to the number of times that element occurs in the input. + * + *

+ * This transform runs a Regex on the entire input line. If the entire line + * does not match the Regex, the line will not be output. If it does match the + * entire line, the group in the Regex will be used. The output will be the + * Regex group. + * + *

+ * Example of use: + * + *

+   *  {@code
+   * PCollection words = ...;
+   * PCollection values =
+   *     words.apply(RegexTransform.matches("myregex (mygroup)", 1));
+   * }
+   * 
+ */ + public static class Matches + extends PTransform, PCollection> { + Pattern pattern; + int group; + + public Matches(String regex, int group) { + this.pattern = Pattern.compile(regex); + this.group = group; + } + + public PCollection apply(PCollection in) { + return in + .apply(ParDo.named("MatchesRegex").of(new DoFn() { + @Override + public void processElement(ProcessContext c) throws Exception { + Matcher m = pattern.matcher((String) c.element()); + + if (m.matches()) { + c.output(m.group(group)); + } + } + })); + } + } + + /** + * {@code RegexTransform.MatchesKV>} takes a + * {@code PCollection} and returns a + * {@code PCollection>} representing the key and value + * extracted from the Regex groups of the input {@code PCollection} to the + * number of times that element occurs in the input. + * + *

+ * This transform runs a Regex on the entire input line. If the entire line + * does not match the Regex, the line will not be output. If it does match the + * entire line, the groups in the Regex will be used. The key will be the + * key's group and the value will be the value's group. + * + *

+ * Example of use: + * + *

+   *  {@code
+   * PCollection words = ...;
+   * PCollection> keysAndValues =
+   *     words.apply(RegexTransform.matchesKV("myregex (mykeygroup) (myvaluegroup)", 1, 2));
+   * }
+   * 
+ */ + public static class MatchesKV + extends PTransform, PCollection>> { + Pattern pattern; + int keyGroup, valueGroup; + + public MatchesKV(String regex, int keyGroup, int valueGroup) { + this.pattern = Pattern.compile(regex); + this.keyGroup = keyGroup; + this.valueGroup = valueGroup; + } + + public PCollection> apply(PCollection in) { + return in.apply(ParDo.named("MatchesKVRegex") + .of(new DoFn>() { + @Override + public void processElement(ProcessContext c) throws Exception { + Matcher m = pattern.matcher((String) c.element()); + + if (m.find()) { + c.output(KV.of(m.group(keyGroup), m.group(valueGroup))); + } + } + })); + } + } + + /** + * {@code RegexTransform.Find} takes a {@code PCollection} and + * returns a {@code PCollection>} representing the key and + * value extracted from the Regex groups of the input {@code PCollection} to + * the number of times that element occurs in the input. + * + *

+ * This transform runs a Regex on the entire input line. If a portion of the + * line does not match the Regex, the line will not be output. If it does + * match a portion of the line, the group in the Regex will be used. The + * output will be the Regex group. + * + *

+ * Example of use: + * + *

+   *  {@code
+   * PCollection words = ...;
+   * PCollection values =
+   *     words.apply(RegexTransform.find("myregex (mygroup)", 1));
+   * }
+   * 
+ */ + public static class Find + extends PTransform, PCollection> { + Pattern pattern; + int group; + + public Find(String regex, int group) { + this.pattern = Pattern.compile(regex); + this.group = group; + } + + public PCollection apply(PCollection in) { + return in.apply(ParDo.named("FindRegex").of(new DoFn() { + @Override + public void processElement(ProcessContext c) throws Exception { + Matcher m = pattern.matcher((String) c.element()); + + if (m.find()) { + c.output(m.group(group)); + } + } + })); + } + } + + /** + * {@code RegexTransform.MatchesKV>} takes a + * {@code PCollection} and returns a + * {@code PCollection>} representing the key and value + * extracted from the Regex groups of the input {@code PCollection} to the + * number of times that element occurs in the input. + * + *

+ * This transform runs a Regex on the entire input line. If a portion of the + * line does not match the Regex, the line will not be output. If it does + * match a portion of the line, the groups in the Regex will be used. The key + * will be the key's group and the value will be the value's group. + * + *

+ * Example of use: + * + *

+   *  {@code
+   * PCollection words = ...;
+   * PCollection> keysAndValues =
+   *     words.apply(RegexTransform.findKV("myregex (mykeygroup) (myvaluegroup)", 1, 2));
+   * }
+   * 
+ */ + public static class FindKV + extends PTransform, PCollection>> { + Pattern pattern; + int keyGroup, valueGroup; + + public FindKV(String regex, int keyGroup, int valueGroup) { + this.pattern = Pattern.compile(regex); + this.keyGroup = keyGroup; + this.valueGroup = valueGroup; + } + + public PCollection> apply(PCollection in) { + return in.apply( + ParDo.named("FindKVRegex").of(new DoFn>() { + @Override + public void processElement(ProcessContext c) throws Exception { + Matcher m = pattern.matcher((String) c.element()); + + if (m.find()) { + c.output(KV.of(m.group(keyGroup), m.group(valueGroup))); + } + } + })); + } + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java new file mode 100644 index 000000000000..a64626a10e75 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.io.Serializable; + +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link RegexTransform}. + */ +@RunWith(JUnit4.class) +public class RegexTransformTest implements Serializable { + @Test + public void testFind() { + TestPipeline p = TestPipeline.create(); + + PCollection output = p + .apply(Create.of("aj", "xj", "yj", "zj")) + .apply(RegexTransform.find("[xyz]")); + + PAssert.that(output).containsInAnyOrder("x", "y", "z"); + p.run(); + } + + @Test + public void testFindGroup() { + TestPipeline p = TestPipeline.create(); + + PCollection output = p + .apply(Create.of("aj", "xj", "yj", "zj")) + .apply(RegexTransform.find("([xyz])", 1)); + + PAssert.that(output).containsInAnyOrder("x", "y", "z"); + p.run(); + } + + @Test + public void testFindNone() { + TestPipeline p = TestPipeline.create(); + + PCollection output = p + .apply(Create.of("a", "b", "c", "d")) + .apply(RegexTransform.find("[xyz]")); + + PAssert.that(output).empty(); + p.run(); + } + + @Test + public void testKVFind() { + TestPipeline p = TestPipeline.create(); + + PCollection> output = p + .apply(Create.of("a b c")) + .apply(RegexTransform.findKV("a (b) (c)", 1, 2)); + + PAssert.that(output).containsInAnyOrder(KV.of("b", "c")); + p.run(); + } + + @Test + public void testKVFindNone() { + TestPipeline p = TestPipeline.create(); + + PCollection> output = p + .apply(Create.of("x y z")) + .apply(RegexTransform.findKV("a (b) (c)", 1, 2)); + + PAssert.that(output).empty(); + p.run(); + } + + @Test + public void testMatches() { + TestPipeline p = TestPipeline.create(); + + PCollection output = p + .apply(Create.of("a", "x", "y", "z")) + .apply(RegexTransform.matches("[xyz]")); + + PAssert.that(output).containsInAnyOrder("x", "y", "z"); + p.run(); + } + + @Test + public void testMatchesNone() { + TestPipeline p = TestPipeline.create(); + + PCollection output = p + .apply(Create.of("a", "b", "c", "d")) + .apply(RegexTransform.matches("[xyz]")); + + PAssert.that(output).empty(); + p.run(); + } + + @Test + public void testMatchesGroup() { + TestPipeline p = TestPipeline.create(); + + PCollection output = p + .apply(Create.of("a", "x xxx", "x yyy", "x zzz")) + .apply(RegexTransform.matches("x ([xyz]*)", 1)); + + PAssert.that(output).containsInAnyOrder("xxx", "yyy", "zzz"); + p.run(); + } + + @Test + public void testKVMatches() { + TestPipeline p = TestPipeline.create(); + + PCollection> output = p + .apply(Create.of("a b c")) + .apply(RegexTransform.matchesKV("a (b) (c)", 1, 2)); + + PAssert.that(output).containsInAnyOrder(KV.of("b", "c")); + p.run(); + } + + @Test + public void testKVMatchesNone() { + TestPipeline p = TestPipeline.create(); + + PCollection> output = p + .apply(Create.of("x y z")) + .apply(RegexTransform.matchesKV("a (b) (c)", 1, 2)); + + PAssert.that(output).empty(); + p.run(); + } +} From f098c5e550bfdd50e7694727fd8f4e7156a9070c Mon Sep 17 00:00:00 2001 From: Jesse Anderson Date: Mon, 2 May 2016 18:08:13 -0700 Subject: [PATCH 17/24] Fixing checkstyle issues. Added missing Apache license. --- .../beam/sdk/transforms/RegexTransform.java | 48 ++++++++++--------- .../sdk/transforms/RegexTransformTest.java | 23 +++++---- 2 files changed, 36 insertions(+), 35 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java index 027967c4e4ae..8421f6aec439 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java @@ -1,30 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.sdk.transforms; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + /** * {@code PTransorm}s to use Regular Expressions to process elements in a * {@link PCollection}. * *

- * {@link RegexTransform#matches()} can be used to see if an entire line matches - * a Regex. {@link RegexTransform#matchesKV()} can be used to see if an entire + * {@link RegexTransform#matches(String, int)} can be used to see if an entire line matches + * a Regex. {@link RegexTransform#matchesKV(String, int, int)} can be used to see if an entire * line matches a Regex and output certain groups as a {@link KV}. *

- * *

- * {@link RegexTransform#find()} can be used to see if a portion of a line - * matches a Regex. {@link RegexTransform#matchesKV()} can be used to see if a + * {@link RegexTransform#find(String, int)} can be used to see if a portion of a line + * matches a Regex. {@link RegexTransform#matchesKV(String, int, int)} can be used to see if a * portion of a line matches a Regex and output certain groups as a {@link KV}. *

- * *

* Lines that do not match the Regex will not be output. *

@@ -38,7 +50,6 @@ private RegexTransform() { * Returns a {@link RegexTransform.Matches} {@link PTransform} that checks if * the entire line matches the Regex. Returns the entire line (group 0) as a * {@link PCollection}. - * * @param regex * The regular expression to run */ @@ -50,7 +61,6 @@ public static Matches matches(String regex) { * Returns a {@link RegexTransform.Matches} {@link PTransform} that checks if * the entire line matches the Regex. Returns the group as a * {@link PCollection}. - * * @param regex * The regular expression to run * @param group @@ -64,7 +74,6 @@ public static Matches matches(String regex, int group) { * Returns a {@link RegexTransform.MatchesKV} {@link PTransform} that checks * if the entire line matches the Regex. Returns the specified groups as the * key and value as a {@link PCollection}. - * * @param regex * The regular expression to run * @param keyGroup @@ -81,19 +90,17 @@ public static MatchesKV matchesKV(String regex, int keyGroup, * Returns a {@link RegexTransform.Find} {@link PTransform} that checks if a * portion of the line matches the Regex. Returns the entire line (group 0) as * a {@link PCollection}. - * * @param regex * The regular expression to run */ public static Find find(String regex) { return find(regex, 0); } - + /** * Returns a {@link RegexTransform.Find} {@link PTransform} that checks if a * portion of the line matches the Regex. Returns the group as a * {@link PCollection}. - * * @param regex * The regular expression to run * @param group @@ -107,7 +114,6 @@ public static Find find(String regex, int group) { * Returns a {@link RegexTransform.FindKV} {@link PTransform} that checks if a * portion of the line matches the Regex. Returns the specified groups as the * key and value as a {@link PCollection}. - * * @param regex * The regular expression to run * @param keyGroup @@ -133,7 +139,6 @@ public static FindKV findKV(String regex, int keyGroup, int valueGroup) { * *

* Example of use: - * *

    *  {@code
    * PCollection words = ...;
@@ -182,7 +187,6 @@ public void processElement(ProcessContext c) throws Exception {
    *
    * 

* Example of use: - * *

    *  {@code
    * PCollection words = ...;
@@ -231,7 +235,6 @@ public void processElement(ProcessContext c) throws Exception {
    *
    * 

* Example of use: - * *

    *  {@code
    * PCollection words = ...;
@@ -279,7 +282,6 @@ public void processElement(ProcessContext c) throws Exception {
    *
    * 

* Example of use: - * *

    *  {@code
    * PCollection words = ...;
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java
index a64626a10e75..d916246ed38e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java
@@ -17,17 +17,17 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import java.io.Serializable;
-
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
+import java.io.Serializable;
+
 /**
  * Tests for {@link RegexTransform}.
  */
@@ -44,7 +44,7 @@ public void testFind() {
     PAssert.that(output).containsInAnyOrder("x", "y", "z");
     p.run();
   }
-  
+
   @Test
   public void testFindGroup() {
     TestPipeline p = TestPipeline.create();
@@ -64,11 +64,11 @@ public void testFindNone() {
     PCollection output = p
         .apply(Create.of("a", "b", "c", "d"))
         .apply(RegexTransform.find("[xyz]"));
-    
+
     PAssert.that(output).empty();
     p.run();
   }
-  
+
   @Test
   public void testKVFind() {
     TestPipeline p = TestPipeline.create();
@@ -88,11 +88,11 @@ public void testKVFindNone() {
     PCollection> output = p
         .apply(Create.of("x y z"))
         .apply(RegexTransform.findKV("a (b) (c)", 1, 2));
-    
+
     PAssert.that(output).empty();
     p.run();
   }
-  
+
   @Test
   public void testMatches() {
     TestPipeline p = TestPipeline.create();
@@ -112,11 +112,11 @@ public void testMatchesNone() {
     PCollection output = p
         .apply(Create.of("a", "b", "c", "d"))
         .apply(RegexTransform.matches("[xyz]"));
-    
+
     PAssert.that(output).empty();
     p.run();
   }
-  
+
   @Test
   public void testMatchesGroup() {
     TestPipeline p = TestPipeline.create();
@@ -128,7 +128,7 @@ public void testMatchesGroup() {
     PAssert.that(output).containsInAnyOrder("xxx", "yyy", "zzz");
     p.run();
   }
-  
+
   @Test
   public void testKVMatches() {
     TestPipeline p = TestPipeline.create();
@@ -148,7 +148,6 @@ public void testKVMatchesNone() {
     PCollection> output = p
         .apply(Create.of("x y z"))
         .apply(RegexTransform.matchesKV("a (b) (c)", 1, 2));
-    
     PAssert.that(output).empty();
     p.run();
   }

From 78f4f531c5898188ae9435fe3cd39fe589395811 Mon Sep 17 00:00:00 2001
From: Jesse Anderson 
Date: Thu, 5 May 2016 08:46:14 -0700
Subject: [PATCH 18/24] Added distributed replacement functions. Add replaceAll
 and replaceFirst. Fixed some JavaDocs.

---
 .../beam/sdk/transforms/RegexTransform.java   | 118 +++++++++++++++++-
 .../sdk/transforms/RegexTransformTest.java    |  48 +++++++
 2 files changed, 162 insertions(+), 4 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java
index 8421f6aec439..9102bbabc380 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java
@@ -125,10 +125,36 @@ public static FindKV findKV(String regex, int keyGroup, int valueGroup) {
     return new FindKV(regex, keyGroup, valueGroup);
   }
 
+  /**
+   * Returns a {@link RegexTransform.ReplaceAll} {@link PTransform} that checks if a
+   * portion of the line matches the Regex and replaces all matches with the replacement
+   * String. Returns the group as a {@link PCollection}.
+   * @param regex
+   *          The regular expression to run
+   * @param replacement
+   *          The string to be substituted for each match
+   */
+  public static ReplaceAll replaceAll(String regex, String replacement) {
+    return new ReplaceAll(regex, replacement);
+  }
+
+  /**
+   * Returns a {@link RegexTransform.ReplaceAll} {@link PTransform} that checks if a
+   * portion of the line matches the Regex and replaces the first match with the replacement
+   * String. Returns the group as a {@link PCollection}.
+   * @param regex
+   *          The regular expression to run
+   * @param replacement
+   *          The string to be substituted for each match
+   */
+  public static ReplaceFirst replaceFirst(String regex, String replacement) {
+    return new ReplaceFirst(regex, replacement);
+  }
+
   /**
    * {@code RegexTransform.Matches} takes a {@code PCollection}
-   * and returns a {@code PCollection>} representing the key
-   * and value extracted from the Regex groups of the input {@code PCollection}
+   * and returns a {@code PCollection} representing the value
+   * extracted from the Regex groups of the input {@code PCollection}
    * to the number of times that element occurs in the input.
    *
    * 

@@ -223,8 +249,8 @@ public void processElement(ProcessContext c) throws Exception { /** * {@code RegexTransform.Find} takes a {@code PCollection} and - * returns a {@code PCollection>} representing the key and - * value extracted from the Regex groups of the input {@code PCollection} to + * returns a {@code PCollection} representing the value extracted + * from the Regex groups of the input {@code PCollection} to * the number of times that element occurs in the input. * *

@@ -315,4 +341,88 @@ public void processElement(ProcessContext c) throws Exception { })); } } + + /** + * {@code RegexTransform.ReplaceAll} takes a {@code PCollection} and + * returns a {@code PCollection} with all Strings that matched the + * Regex being replaced with the replacement string. + * + *

+ * This transform runs a Regex on the entire input line. If a portion of the + * line does not match the Regex, the line will be output without changes. If it does + * match a portion of the line, all portions matching the Regex will be replaced + * with the replacement String. + * + *

+ * Example of use: + *

+   *  {@code
+   * PCollection words = ...;
+   * PCollection values =
+   *     words.apply(RegexTransform.replaceAll("myregex", "myreplacement"));
+   * }
+   * 
+ */ + public static class ReplaceAll + extends PTransform, PCollection> { + Pattern pattern; + String replacement; + + public ReplaceAll(String regex, String replacement) { + this.pattern = Pattern.compile(regex); + this.replacement = replacement; + } + + public PCollection apply(PCollection in) { + return in.apply(ParDo.named("ReplaceAllRegex").of(new DoFn() { + @Override + public void processElement(ProcessContext c) throws Exception { + Matcher m = pattern.matcher((String) c.element()); + c.output(m.replaceAll(replacement)); + } + })); + } + } + + /** + * {@code RegexTransform.ReplaceFirst} takes a {@code PCollection} and + * returns a {@code PCollection} with the first Strings that matched the + * Regex being replaced with the replacement string. + * + *

+ * This transform runs a Regex on the entire input line. If a portion of the + * line does not match the Regex, the line will be output without changes. If it does + * match a portion of the line, the first portion matching the Regex will be replaced + * with the replacement String. + * + *

+ * Example of use: + *

+   *  {@code
+   * PCollection words = ...;
+   * PCollection values =
+   *     words.apply(RegexTransform.replaceFirst("myregex", "myreplacement"));
+   * }
+   * 
+ */ + public static class ReplaceFirst + extends PTransform, PCollection> { + Pattern pattern; + String replacement; + + public ReplaceFirst(String regex, String replacement) { + this.pattern = Pattern.compile(regex); + this.replacement = replacement; + } + + public PCollection apply(PCollection in) { + return in.apply(ParDo.named("ReplaceAllRegex").of(new DoFn() { + @Override + public void processElement(ProcessContext c) throws Exception { + Matcher m = pattern.matcher((String) c.element()); + c.output(m.replaceFirst(replacement)); + } + })); + } + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java index d916246ed38e..a9a5efbda268 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java @@ -151,4 +151,52 @@ public void testKVMatchesNone() { PAssert.that(output).empty(); p.run(); } + + @Test + public void testReplaceAll() { + TestPipeline p = TestPipeline.create(); + + PCollection output = p + .apply(Create.of("xj", "yj", "zj")) + .apply(RegexTransform.replaceAll("[xyz]", "new")); + + PAssert.that(output).containsInAnyOrder("newj", "newj", "newj"); + p.run(); + } + + @Test + public void testReplaceAllMixed() { + TestPipeline p = TestPipeline.create(); + + PCollection output = p + .apply(Create.of("abc", "xj", "yj", "zj", "def")) + .apply(RegexTransform.replaceAll("[xyz]", "new")); + + PAssert.that(output).containsInAnyOrder("abc", "newj", "newj", "newj", "def"); + p.run(); + } + + @Test + public void testReplaceFirst() { + TestPipeline p = TestPipeline.create(); + + PCollection output = p + .apply(Create.of("xjx", "yjy", "zjz")) + .apply(RegexTransform.replaceFirst("[xyz]", "new")); + + PAssert.that(output).containsInAnyOrder("newjx", "newjy", "newjz"); + p.run(); + } + + @Test + public void testReplaceFirstMixed() { + TestPipeline p = TestPipeline.create(); + + PCollection output = p + .apply(Create.of("abc", "xjx", "yjy", "zjz", "def")) + .apply(RegexTransform.replaceFirst("[xyz]", "new")); + + PAssert.that(output).containsInAnyOrder("abc", "newjx", "newjy", "newjz", "def"); + p.run(); + } } From d812294124192e4acbcfc526472ef8dff6e98433 Mon Sep 17 00:00:00 2001 From: Jesse Anderson Date: Thu, 5 May 2016 08:55:58 -0700 Subject: [PATCH 19/24] Whitespace fixes for check style. --- .../apache/beam/sdk/transforms/RegexTransform.java | 12 ++++++------ .../beam/sdk/transforms/RegexTransformTest.java | 8 ++++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java index 9102bbabc380..99a2f708ec61 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java @@ -341,16 +341,16 @@ public void processElement(ProcessContext c) throws Exception { })); } } - + /** * {@code RegexTransform.ReplaceAll} takes a {@code PCollection} and - * returns a {@code PCollection} with all Strings that matched the + * returns a {@code PCollection} with all Strings that matched the * Regex being replaced with the replacement string. * *

* This transform runs a Regex on the entire input line. If a portion of the * line does not match the Regex, the line will be output without changes. If it does - * match a portion of the line, all portions matching the Regex will be replaced + * match a portion of the line, all portions matching the Regex will be replaced * with the replacement String. * *

@@ -383,16 +383,16 @@ public void processElement(ProcessContext c) throws Exception { })); } } - + /** * {@code RegexTransform.ReplaceFirst} takes a {@code PCollection} and - * returns a {@code PCollection} with the first Strings that matched the + * returns a {@code PCollection} with the first Strings that matched the * Regex being replaced with the replacement string. * *

* This transform runs a Regex on the entire input line. If a portion of the * line does not match the Regex, the line will be output without changes. If it does - * match a portion of the line, the first portion matching the Regex will be replaced + * match a portion of the line, the first portion matching the Regex will be replaced * with the replacement String. * *

diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java index a9a5efbda268..393a34da8c7d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java @@ -151,7 +151,7 @@ public void testKVMatchesNone() { PAssert.that(output).empty(); p.run(); } - + @Test public void testReplaceAll() { TestPipeline p = TestPipeline.create(); @@ -163,7 +163,7 @@ public void testReplaceAll() { PAssert.that(output).containsInAnyOrder("newj", "newj", "newj"); p.run(); } - + @Test public void testReplaceAllMixed() { TestPipeline p = TestPipeline.create(); @@ -175,7 +175,7 @@ public void testReplaceAllMixed() { PAssert.that(output).containsInAnyOrder("abc", "newj", "newj", "newj", "def"); p.run(); } - + @Test public void testReplaceFirst() { TestPipeline p = TestPipeline.create(); @@ -187,7 +187,7 @@ public void testReplaceFirst() { PAssert.that(output).containsInAnyOrder("newjx", "newjy", "newjz"); p.run(); } - + @Test public void testReplaceFirstMixed() { TestPipeline p = TestPipeline.create(); From 8c43cb27f6047edab9e450f9b84cfaf478aeb470 Mon Sep 17 00:00:00 2001 From: Jesse Anderson Date: Mon, 16 May 2016 13:58:08 -0700 Subject: [PATCH 20/24] Changed Word Counts to use TypeDescriptors. --- .../org/apache/beam/examples/MinimalWordCountJava8.java | 6 +++--- .../org/apache/beam/examples/MinimalWordCountJava8Test.java | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java index 398d517baa54..d4917418402d 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.transforms.FlatMapElements; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; import java.util.Arrays; @@ -54,12 +54,12 @@ public static void main(String[] args) { p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*")) .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))) - .withOutputType(new TypeDescriptor() {})) + .withOutputType(TypeDescriptors.strings())) .apply(Filter.byPredicate((String word) -> !word.isEmpty())) .apply(Count.perElement()) .apply(MapElements .via((KV wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()) - .withOutputType(new TypeDescriptor() {})) + .withOutputType(TypeDescriptors.strings())) // CHANGE 3 of 3: The Google Cloud Storage path is required for outputting the results to. .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX")); diff --git a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java index ae925592504e..f73250fd386a 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java @@ -28,7 +28,7 @@ import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; import com.google.common.collect.ImmutableList; @@ -65,12 +65,12 @@ public void testMinimalWordCountJava8() throws Exception { p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*")) .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))) - .withOutputType(new TypeDescriptor() {})) + .withOutputType(TypeDescriptors.strings())) .apply(Filter.byPredicate((String word) -> !word.isEmpty())) .apply(Count.perElement()) .apply(MapElements .via((KV wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()) - .withOutputType(new TypeDescriptor() {})) + .withOutputType(TypeDescriptors.strings())) .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX")); } From 6834cbefa08f4ea07e932bebba43bd17421a5d1a Mon Sep 17 00:00:00 2001 From: Jesse Anderson Date: Mon, 16 May 2016 14:09:18 -0700 Subject: [PATCH 21/24] Updated complete examples to use TypeDescriptors. --- .../org/apache/beam/examples/complete/game/GameStats.java | 5 +++-- .../org/apache/beam/examples/complete/game/UserScore.java | 5 +++-- .../beam/examples/complete/game/HourlyTeamScoreTest.java | 5 +++-- .../apache/beam/examples/complete/game/UserScoreTest.java | 5 +++-- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java index 7814eb196c21..c5579134850a 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java @@ -44,7 +44,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; import org.joda.time.DateTimeZone; import org.joda.time.Duration; @@ -255,7 +255,8 @@ public static void main(String[] args) throws Exception { PCollection> userEvents = rawEvents.apply("ExtractUserScore", MapElements.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())) - .withOutputType(new TypeDescriptor>() {})); + .withOutputType( + TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))); // Calculate the total score per user over fixed windows, and // cumulative updates for late data. diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java index 866adefdb886..de049e83ea13 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java @@ -35,7 +35,7 @@ import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.avro.reflect.Nullable; import org.slf4j.Logger; @@ -168,7 +168,8 @@ public PCollection> apply( return gameInfo .apply(MapElements .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore())) - .withOutputType(new TypeDescriptor>() {})) + .withOutputType( + TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))) .apply(Sum.integersPerKey()); } } diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java index aa11c6c34644..5ff615ad93b5 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java @@ -30,7 +30,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; import org.joda.time.Instant; import org.junit.Test; @@ -102,7 +102,8 @@ public void testUserScoresFilter() throws Exception { // run a map to access the fields in the result. .apply(MapElements .via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())) - .withOutputType(new TypeDescriptor>() {})); + .withOutputType( + TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))); PAssert.that(output).containsInAnyOrder(FILTERED_EVENTS); diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java index 40740a6bc2a3..b38e4ecf324c 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java @@ -31,7 +31,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; import org.junit.Assert; import org.junit.Test; @@ -146,7 +146,8 @@ public void testUserScoresBadInput() throws Exception { .apply(ParDo.of(new ParseEventFn())) .apply( MapElements.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())) - .withOutputType(new TypeDescriptor>() {})); + .withOutputType( + TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))); PAssert.that(extract).empty(); From b5ad893fd3a3768bbd341f399eae87509a1c0391 Mon Sep 17 00:00:00 2001 From: Jesse Anderson Date: Mon, 16 May 2016 15:11:57 -0700 Subject: [PATCH 22/24] Removing Regex transforms from this branch. --- .../beam/sdk/transforms/RegexTransform.java | 428 ------------------ .../sdk/transforms/RegexTransformTest.java | 202 --------- 2 files changed, 630 deletions(-) delete mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java delete mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java deleted file mode 100644 index 99a2f708ec61..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java +++ /dev/null @@ -1,428 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * {@code PTransorm}s to use Regular Expressions to process elements in a - * {@link PCollection}. - * - *

- * {@link RegexTransform#matches(String, int)} can be used to see if an entire line matches - * a Regex. {@link RegexTransform#matchesKV(String, int, int)} can be used to see if an entire - * line matches a Regex and output certain groups as a {@link KV}. - *

- *

- * {@link RegexTransform#find(String, int)} can be used to see if a portion of a line - * matches a Regex. {@link RegexTransform#matchesKV(String, int, int)} can be used to see if a - * portion of a line matches a Regex and output certain groups as a {@link KV}. - *

- *

- * Lines that do not match the Regex will not be output. - *

- */ -public class RegexTransform { - private RegexTransform() { - // do not instantiate - } - - /** - * Returns a {@link RegexTransform.Matches} {@link PTransform} that checks if - * the entire line matches the Regex. Returns the entire line (group 0) as a - * {@link PCollection}. - * @param regex - * The regular expression to run - */ - public static Matches matches(String regex) { - return matches(regex, 0); - } - - /** - * Returns a {@link RegexTransform.Matches} {@link PTransform} that checks if - * the entire line matches the Regex. Returns the group as a - * {@link PCollection}. - * @param regex - * The regular expression to run - * @param group - * The Regex group to return as a PCollection - */ - public static Matches matches(String regex, int group) { - return new Matches(regex, group); - } - - /** - * Returns a {@link RegexTransform.MatchesKV} {@link PTransform} that checks - * if the entire line matches the Regex. Returns the specified groups as the - * key and value as a {@link PCollection}. - * @param regex - * The regular expression to run - * @param keyGroup - * The Regex group to use as the key - * @param valueGroup - * The Regex group to use the value - */ - public static MatchesKV matchesKV(String regex, int keyGroup, - int valueGroup) { - return new MatchesKV(regex, keyGroup, valueGroup); - } - - /** - * Returns a {@link RegexTransform.Find} {@link PTransform} that checks if a - * portion of the line matches the Regex. Returns the entire line (group 0) as - * a {@link PCollection}. - * @param regex - * The regular expression to run - */ - public static Find find(String regex) { - return find(regex, 0); - } - - /** - * Returns a {@link RegexTransform.Find} {@link PTransform} that checks if a - * portion of the line matches the Regex. Returns the group as a - * {@link PCollection}. - * @param regex - * The regular expression to run - * @param group - * The Regex group to return as a PCollection - */ - public static Find find(String regex, int group) { - return new Find(regex, group); - } - - /** - * Returns a {@link RegexTransform.FindKV} {@link PTransform} that checks if a - * portion of the line matches the Regex. Returns the specified groups as the - * key and value as a {@link PCollection}. - * @param regex - * The regular expression to run - * @param keyGroup - * The Regex group to use as the key - * @param valueGroup - * The Regex group to use the value - */ - public static FindKV findKV(String regex, int keyGroup, int valueGroup) { - return new FindKV(regex, keyGroup, valueGroup); - } - - /** - * Returns a {@link RegexTransform.ReplaceAll} {@link PTransform} that checks if a - * portion of the line matches the Regex and replaces all matches with the replacement - * String. Returns the group as a {@link PCollection}. - * @param regex - * The regular expression to run - * @param replacement - * The string to be substituted for each match - */ - public static ReplaceAll replaceAll(String regex, String replacement) { - return new ReplaceAll(regex, replacement); - } - - /** - * Returns a {@link RegexTransform.ReplaceAll} {@link PTransform} that checks if a - * portion of the line matches the Regex and replaces the first match with the replacement - * String. Returns the group as a {@link PCollection}. - * @param regex - * The regular expression to run - * @param replacement - * The string to be substituted for each match - */ - public static ReplaceFirst replaceFirst(String regex, String replacement) { - return new ReplaceFirst(regex, replacement); - } - - /** - * {@code RegexTransform.Matches} takes a {@code PCollection} - * and returns a {@code PCollection} representing the value - * extracted from the Regex groups of the input {@code PCollection} - * to the number of times that element occurs in the input. - * - *

- * This transform runs a Regex on the entire input line. If the entire line - * does not match the Regex, the line will not be output. If it does match the - * entire line, the group in the Regex will be used. The output will be the - * Regex group. - * - *

- * Example of use: - *

-   *  {@code
-   * PCollection words = ...;
-   * PCollection values =
-   *     words.apply(RegexTransform.matches("myregex (mygroup)", 1));
-   * }
-   * 
- */ - public static class Matches - extends PTransform, PCollection> { - Pattern pattern; - int group; - - public Matches(String regex, int group) { - this.pattern = Pattern.compile(regex); - this.group = group; - } - - public PCollection apply(PCollection in) { - return in - .apply(ParDo.named("MatchesRegex").of(new DoFn() { - @Override - public void processElement(ProcessContext c) throws Exception { - Matcher m = pattern.matcher((String) c.element()); - - if (m.matches()) { - c.output(m.group(group)); - } - } - })); - } - } - - /** - * {@code RegexTransform.MatchesKV>} takes a - * {@code PCollection} and returns a - * {@code PCollection>} representing the key and value - * extracted from the Regex groups of the input {@code PCollection} to the - * number of times that element occurs in the input. - * - *

- * This transform runs a Regex on the entire input line. If the entire line - * does not match the Regex, the line will not be output. If it does match the - * entire line, the groups in the Regex will be used. The key will be the - * key's group and the value will be the value's group. - * - *

- * Example of use: - *

-   *  {@code
-   * PCollection words = ...;
-   * PCollection> keysAndValues =
-   *     words.apply(RegexTransform.matchesKV("myregex (mykeygroup) (myvaluegroup)", 1, 2));
-   * }
-   * 
- */ - public static class MatchesKV - extends PTransform, PCollection>> { - Pattern pattern; - int keyGroup, valueGroup; - - public MatchesKV(String regex, int keyGroup, int valueGroup) { - this.pattern = Pattern.compile(regex); - this.keyGroup = keyGroup; - this.valueGroup = valueGroup; - } - - public PCollection> apply(PCollection in) { - return in.apply(ParDo.named("MatchesKVRegex") - .of(new DoFn>() { - @Override - public void processElement(ProcessContext c) throws Exception { - Matcher m = pattern.matcher((String) c.element()); - - if (m.find()) { - c.output(KV.of(m.group(keyGroup), m.group(valueGroup))); - } - } - })); - } - } - - /** - * {@code RegexTransform.Find} takes a {@code PCollection} and - * returns a {@code PCollection} representing the value extracted - * from the Regex groups of the input {@code PCollection} to - * the number of times that element occurs in the input. - * - *

- * This transform runs a Regex on the entire input line. If a portion of the - * line does not match the Regex, the line will not be output. If it does - * match a portion of the line, the group in the Regex will be used. The - * output will be the Regex group. - * - *

- * Example of use: - *

-   *  {@code
-   * PCollection words = ...;
-   * PCollection values =
-   *     words.apply(RegexTransform.find("myregex (mygroup)", 1));
-   * }
-   * 
- */ - public static class Find - extends PTransform, PCollection> { - Pattern pattern; - int group; - - public Find(String regex, int group) { - this.pattern = Pattern.compile(regex); - this.group = group; - } - - public PCollection apply(PCollection in) { - return in.apply(ParDo.named("FindRegex").of(new DoFn() { - @Override - public void processElement(ProcessContext c) throws Exception { - Matcher m = pattern.matcher((String) c.element()); - - if (m.find()) { - c.output(m.group(group)); - } - } - })); - } - } - - /** - * {@code RegexTransform.MatchesKV>} takes a - * {@code PCollection} and returns a - * {@code PCollection>} representing the key and value - * extracted from the Regex groups of the input {@code PCollection} to the - * number of times that element occurs in the input. - * - *

- * This transform runs a Regex on the entire input line. If a portion of the - * line does not match the Regex, the line will not be output. If it does - * match a portion of the line, the groups in the Regex will be used. The key - * will be the key's group and the value will be the value's group. - * - *

- * Example of use: - *

-   *  {@code
-   * PCollection words = ...;
-   * PCollection> keysAndValues =
-   *     words.apply(RegexTransform.findKV("myregex (mykeygroup) (myvaluegroup)", 1, 2));
-   * }
-   * 
- */ - public static class FindKV - extends PTransform, PCollection>> { - Pattern pattern; - int keyGroup, valueGroup; - - public FindKV(String regex, int keyGroup, int valueGroup) { - this.pattern = Pattern.compile(regex); - this.keyGroup = keyGroup; - this.valueGroup = valueGroup; - } - - public PCollection> apply(PCollection in) { - return in.apply( - ParDo.named("FindKVRegex").of(new DoFn>() { - @Override - public void processElement(ProcessContext c) throws Exception { - Matcher m = pattern.matcher((String) c.element()); - - if (m.find()) { - c.output(KV.of(m.group(keyGroup), m.group(valueGroup))); - } - } - })); - } - } - - /** - * {@code RegexTransform.ReplaceAll} takes a {@code PCollection} and - * returns a {@code PCollection} with all Strings that matched the - * Regex being replaced with the replacement string. - * - *

- * This transform runs a Regex on the entire input line. If a portion of the - * line does not match the Regex, the line will be output without changes. If it does - * match a portion of the line, all portions matching the Regex will be replaced - * with the replacement String. - * - *

- * Example of use: - *

-   *  {@code
-   * PCollection words = ...;
-   * PCollection values =
-   *     words.apply(RegexTransform.replaceAll("myregex", "myreplacement"));
-   * }
-   * 
- */ - public static class ReplaceAll - extends PTransform, PCollection> { - Pattern pattern; - String replacement; - - public ReplaceAll(String regex, String replacement) { - this.pattern = Pattern.compile(regex); - this.replacement = replacement; - } - - public PCollection apply(PCollection in) { - return in.apply(ParDo.named("ReplaceAllRegex").of(new DoFn() { - @Override - public void processElement(ProcessContext c) throws Exception { - Matcher m = pattern.matcher((String) c.element()); - c.output(m.replaceAll(replacement)); - } - })); - } - } - - /** - * {@code RegexTransform.ReplaceFirst} takes a {@code PCollection} and - * returns a {@code PCollection} with the first Strings that matched the - * Regex being replaced with the replacement string. - * - *

- * This transform runs a Regex on the entire input line. If a portion of the - * line does not match the Regex, the line will be output without changes. If it does - * match a portion of the line, the first portion matching the Regex will be replaced - * with the replacement String. - * - *

- * Example of use: - *

-   *  {@code
-   * PCollection words = ...;
-   * PCollection values =
-   *     words.apply(RegexTransform.replaceFirst("myregex", "myreplacement"));
-   * }
-   * 
- */ - public static class ReplaceFirst - extends PTransform, PCollection> { - Pattern pattern; - String replacement; - - public ReplaceFirst(String regex, String replacement) { - this.pattern = Pattern.compile(regex); - this.replacement = replacement; - } - - public PCollection apply(PCollection in) { - return in.apply(ParDo.named("ReplaceAllRegex").of(new DoFn() { - @Override - public void processElement(ProcessContext c) throws Exception { - Matcher m = pattern.matcher((String) c.element()); - c.output(m.replaceFirst(replacement)); - } - })); - } - } -} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java deleted file mode 100644 index 393a34da8c7d..000000000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.Serializable; - -/** - * Tests for {@link RegexTransform}. - */ -@RunWith(JUnit4.class) -public class RegexTransformTest implements Serializable { - @Test - public void testFind() { - TestPipeline p = TestPipeline.create(); - - PCollection output = p - .apply(Create.of("aj", "xj", "yj", "zj")) - .apply(RegexTransform.find("[xyz]")); - - PAssert.that(output).containsInAnyOrder("x", "y", "z"); - p.run(); - } - - @Test - public void testFindGroup() { - TestPipeline p = TestPipeline.create(); - - PCollection output = p - .apply(Create.of("aj", "xj", "yj", "zj")) - .apply(RegexTransform.find("([xyz])", 1)); - - PAssert.that(output).containsInAnyOrder("x", "y", "z"); - p.run(); - } - - @Test - public void testFindNone() { - TestPipeline p = TestPipeline.create(); - - PCollection output = p - .apply(Create.of("a", "b", "c", "d")) - .apply(RegexTransform.find("[xyz]")); - - PAssert.that(output).empty(); - p.run(); - } - - @Test - public void testKVFind() { - TestPipeline p = TestPipeline.create(); - - PCollection> output = p - .apply(Create.of("a b c")) - .apply(RegexTransform.findKV("a (b) (c)", 1, 2)); - - PAssert.that(output).containsInAnyOrder(KV.of("b", "c")); - p.run(); - } - - @Test - public void testKVFindNone() { - TestPipeline p = TestPipeline.create(); - - PCollection> output = p - .apply(Create.of("x y z")) - .apply(RegexTransform.findKV("a (b) (c)", 1, 2)); - - PAssert.that(output).empty(); - p.run(); - } - - @Test - public void testMatches() { - TestPipeline p = TestPipeline.create(); - - PCollection output = p - .apply(Create.of("a", "x", "y", "z")) - .apply(RegexTransform.matches("[xyz]")); - - PAssert.that(output).containsInAnyOrder("x", "y", "z"); - p.run(); - } - - @Test - public void testMatchesNone() { - TestPipeline p = TestPipeline.create(); - - PCollection output = p - .apply(Create.of("a", "b", "c", "d")) - .apply(RegexTransform.matches("[xyz]")); - - PAssert.that(output).empty(); - p.run(); - } - - @Test - public void testMatchesGroup() { - TestPipeline p = TestPipeline.create(); - - PCollection output = p - .apply(Create.of("a", "x xxx", "x yyy", "x zzz")) - .apply(RegexTransform.matches("x ([xyz]*)", 1)); - - PAssert.that(output).containsInAnyOrder("xxx", "yyy", "zzz"); - p.run(); - } - - @Test - public void testKVMatches() { - TestPipeline p = TestPipeline.create(); - - PCollection> output = p - .apply(Create.of("a b c")) - .apply(RegexTransform.matchesKV("a (b) (c)", 1, 2)); - - PAssert.that(output).containsInAnyOrder(KV.of("b", "c")); - p.run(); - } - - @Test - public void testKVMatchesNone() { - TestPipeline p = TestPipeline.create(); - - PCollection> output = p - .apply(Create.of("x y z")) - .apply(RegexTransform.matchesKV("a (b) (c)", 1, 2)); - PAssert.that(output).empty(); - p.run(); - } - - @Test - public void testReplaceAll() { - TestPipeline p = TestPipeline.create(); - - PCollection output = p - .apply(Create.of("xj", "yj", "zj")) - .apply(RegexTransform.replaceAll("[xyz]", "new")); - - PAssert.that(output).containsInAnyOrder("newj", "newj", "newj"); - p.run(); - } - - @Test - public void testReplaceAllMixed() { - TestPipeline p = TestPipeline.create(); - - PCollection output = p - .apply(Create.of("abc", "xj", "yj", "zj", "def")) - .apply(RegexTransform.replaceAll("[xyz]", "new")); - - PAssert.that(output).containsInAnyOrder("abc", "newj", "newj", "newj", "def"); - p.run(); - } - - @Test - public void testReplaceFirst() { - TestPipeline p = TestPipeline.create(); - - PCollection output = p - .apply(Create.of("xjx", "yjy", "zjz")) - .apply(RegexTransform.replaceFirst("[xyz]", "new")); - - PAssert.that(output).containsInAnyOrder("newjx", "newjy", "newjz"); - p.run(); - } - - @Test - public void testReplaceFirstMixed() { - TestPipeline p = TestPipeline.create(); - - PCollection output = p - .apply(Create.of("abc", "xjx", "yjy", "zjz", "def")) - .apply(RegexTransform.replaceFirst("[xyz]", "new")); - - PAssert.that(output).containsInAnyOrder("abc", "newjx", "newjy", "newjz", "def"); - p.run(); - } -} From 3c942925f485ecac014e6c1b1b669e16813b5f62 Mon Sep 17 00:00:00 2001 From: Jesse Anderson Date: Mon, 23 May 2016 12:08:12 -0700 Subject: [PATCH 23/24] Trivial change to kick off another build. --- .../java/org/apache/beam/examples/MinimalWordCountJava8.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java index d4917418402d..00fe7302f6b6 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java @@ -52,7 +52,7 @@ public static void main(String[] args) { Pipeline p = Pipeline.create(options); - p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*")) + p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/* ")) .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))) .withOutputType(TypeDescriptors.strings())) .apply(Filter.byPredicate((String word) -> !word.isEmpty())) From d7b86d82c320163560a9a2ba5436ded88f770fed Mon Sep 17 00:00:00 2001 From: Jesse Anderson Date: Mon, 23 May 2016 12:08:34 -0700 Subject: [PATCH 24/24] Trivial change to kick off another build. --- .../java/org/apache/beam/examples/MinimalWordCountJava8.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java index 00fe7302f6b6..d4917418402d 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java @@ -52,7 +52,7 @@ public static void main(String[] args) { Pipeline p = Pipeline.create(options); - p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/* ")) + p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*")) .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))) .withOutputType(TypeDescriptors.strings())) .apply(Filter.byPredicate((String word) -> !word.isEmpty()))