From 48491586166f1082ce4ba725ad274e10b1ccb74e Mon Sep 17 00:00:00 2001 From: David Cavazos Date: Tue, 26 Sep 2017 10:38:38 -0700 Subject: [PATCH 1/8] Created Java snippets file --- .../org/apache/beam/examples/Snippets.java | 93 +++++++++++++++ .../apache/beam/examples/SnippetsTest.java | 111 ++++++++++++++++++ 2 files changed, 204 insertions(+) create mode 100644 examples/java8/src/main/java/org/apache/beam/examples/Snippets.java create mode 100644 examples/java8/src/test/java/org/apache/beam/examples/SnippetsTest.java diff --git a/examples/java8/src/main/java/org/apache/beam/examples/Snippets.java b/examples/java8/src/main/java/org/apache/beam/examples/Snippets.java new file mode 100644 index 000000000000..8d5fb1c40f12 --- /dev/null +++ b/examples/java8/src/main/java/org/apache/beam/examples/Snippets.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Code snippets used in webdocs. + */ +public class Snippets { + + /* Helper function to format results in CoGroupByKeyTuple */ + public static String formatCoGbkResults( + String name, Iterable emails, Iterable phones) { + + List emailsList = new ArrayList<>(); + for (String elem : emails) { + emailsList.add("'" + elem + "'"); + } + Collections.sort(emailsList); + String emailsStr = "[" + String.join(", ", emailsList) + "]"; + + List phonesList = new ArrayList<>(); + for (String elem : phones) { + phonesList.add("'" + elem + "'"); + } + Collections.sort(phonesList); + String phonesStr = "[" + String.join(", ", phonesList) + "]"; + + return name + "; " + emailsStr + "; " + phonesStr; + } + + public static PCollection coGroupByKeyTuple( + Pipeline p, + TupleTag emailsTag, + TupleTag phonesTag, + List> emailList, + List> phoneList) { + + // [START CoGroupByKeyTuple] + PCollection> emailsPColl = p.apply("create emails", Create.of(emailList)); + PCollection> phonesPColl = p.apply("create phones", Create.of(phoneList)); + + PCollection> results = + KeyedPCollectionTuple + .of(emailsTag, emailsPColl) + .and(phonesTag, phonesPColl) + .apply(CoGroupByKey.create()); + + PCollection formattedResults = results.apply(ParDo.of( + new DoFn, String>() { + @ProcessElement + public void processElement(ProcessContext c) { + KV e = c.element(); + String name = e.getKey(); + Iterable emails = e.getValue().getAll(emailsTag); + Iterable phones = e.getValue().getAll(phonesTag); + String formattedResult = formatCoGbkResults(name, emails, phones); + c.output(formattedResult); + } + } + )); + // [END CoGroupByKeyTuple] + return formattedResults; + } +} diff --git a/examples/java8/src/test/java/org/apache/beam/examples/SnippetsTest.java b/examples/java8/src/test/java/org/apache/beam/examples/SnippetsTest.java new file mode 100644 index 000000000000..65872d7d9f65 --- /dev/null +++ b/examples/java8/src/test/java/org/apache/beam/examples/SnippetsTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + + +/** + * Tests for Snippets. + */ +@RunWith(JUnit4.class) +public class SnippetsTest implements Serializable { + + @Rule + public transient TestPipeline p = TestPipeline.create(); + + /* Tests CoGroupByKeyTuple */ + @Test + public void testCoGroupByKeyTuple() throws IOException { + // [START CoGroupByKeyTupleInputs] + final List> emailList = Arrays.asList( + KV.of("amy", "amy@example.com"), + KV.of("carl", "carl@example.com"), + KV.of("julia", "julia@example.com"), + KV.of("carl", "carl@email.com")); + + final List> phoneList = Arrays.asList( + KV.of("amy", "111-222-3333"), + KV.of("james", "222-333-4444"), + KV.of("amy", "333-444-5555"), + KV.of("carl", "444-555-6666")); + // [END CoGroupByKeyTupleInputs] + + // [START CoGroupByKeyTupleOutputs] + final TupleTag emailsTag = new TupleTag(); + final TupleTag phonesTag = new TupleTag(); + + final List> expectedResults = Arrays.asList( + KV.of("amy", CoGbkResult + .of(emailsTag, Arrays.asList("amy@example.com")) + .and(phonesTag, Arrays.asList("111-222-3333", "333-444-5555"))), + KV.of("carl", CoGbkResult + .of(emailsTag, Arrays.asList("carl@email.com", "carl@example.com")) + .and(phonesTag, Arrays.asList("444-555-6666"))), + KV.of("james", CoGbkResult + .of(emailsTag, Arrays.asList()) + .and(phonesTag, Arrays.asList("222-333-4444"))), + KV.of("julia", CoGbkResult + .of(emailsTag, Arrays.asList("julia@example.com")) + .and(phonesTag, Arrays.asList()))); + // [END CoGroupByKeyTupleOutputs] + + PCollection actualFormattedResults = + Snippets.coGroupByKeyTuple(p, emailsTag, phonesTag, emailList, phoneList); + + // [START CoGroupByKeyTupleFormattedOutputs] + final List formattedResults = Arrays.asList( + "amy; ['amy@example.com']; ['111-222-3333', '333-444-5555']", + "carl; ['carl@email.com', 'carl@example.com']; ['444-555-6666']", + "james; []; ['222-333-4444']", + "julia; ['julia@example.com']; []"); + // [END CoGroupByKeyTupleFormattedOutputs] + + // Make sure that both 'expectedResults' and 'actualFormattedResults' match with the + // 'formattedResults'. 'expectedResults' will have to be formatted before comparing + List expectedFormattedResultsList = new ArrayList(expectedResults.size()); + for (KV e : expectedResults) { + String name = e.getKey(); + Iterable emails = e.getValue().getAll(emailsTag); + Iterable phones = e.getValue().getAll(phonesTag); + String formattedResult = Snippets.formatCoGbkResults(name, emails, phones); + expectedFormattedResultsList.add(formattedResult); + } + PCollection expectedFormattedResultsPColl = + p.apply(Create.of(expectedFormattedResultsList)); + PAssert.that(expectedFormattedResultsPColl).containsInAnyOrder(formattedResults); + PAssert.that(actualFormattedResults).containsInAnyOrder(formattedResults); + + p.run().waitUntilFinish(); + } +} From 647d7401101189bd34123f7ad388390473d33eb9 Mon Sep 17 00:00:00 2001 From: David Cavazos Date: Wed, 27 Sep 2017 10:31:22 -0700 Subject: [PATCH 2/8] Fixed typo in comment --- .../java8/src/main/java/org/apache/beam/examples/Snippets.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/java8/src/main/java/org/apache/beam/examples/Snippets.java b/examples/java8/src/main/java/org/apache/beam/examples/Snippets.java index 8d5fb1c40f12..6066fa49b4e7 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/Snippets.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/Snippets.java @@ -36,7 +36,7 @@ */ public class Snippets { - /* Helper function to format results in CoGroupByKeyTuple */ + /* Helper function to format results in coGroupByKeyTuple */ public static String formatCoGbkResults( String name, Iterable emails, Iterable phones) { From 27c4f9600b15f67a8857d4cad6a79d92e10aa411 Mon Sep 17 00:00:00 2001 From: David Cavazos Date: Thu, 5 Oct 2017 10:21:42 -0700 Subject: [PATCH 3/8] Moved Java snippets to website_snippets directory --- .../org/apache/beam/examples/{ => website_snippets}/Snippets.java | 0 .../apache/beam/examples/{ => website_snippets}/SnippetsTest.java | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename examples/java8/src/main/java/org/apache/beam/examples/{ => website_snippets}/Snippets.java (100%) rename examples/java8/src/test/java/org/apache/beam/examples/{ => website_snippets}/SnippetsTest.java (100%) diff --git a/examples/java8/src/main/java/org/apache/beam/examples/Snippets.java b/examples/java8/src/main/java/org/apache/beam/examples/website_snippets/Snippets.java similarity index 100% rename from examples/java8/src/main/java/org/apache/beam/examples/Snippets.java rename to examples/java8/src/main/java/org/apache/beam/examples/website_snippets/Snippets.java diff --git a/examples/java8/src/test/java/org/apache/beam/examples/SnippetsTest.java b/examples/java8/src/test/java/org/apache/beam/examples/website_snippets/SnippetsTest.java similarity index 100% rename from examples/java8/src/test/java/org/apache/beam/examples/SnippetsTest.java rename to examples/java8/src/test/java/org/apache/beam/examples/website_snippets/SnippetsTest.java From 895c31a7c63e4cf23b12ecd6baaf94ccfc1232a7 Mon Sep 17 00:00:00 2001 From: David Cavazos Date: Wed, 11 Oct 2017 14:55:50 -0700 Subject: [PATCH 4/8] Created PCollections at test file and passed as parameters --- .../examples/website_snippets/Snippets.java | 18 +++---- .../website_snippets/SnippetsTest.java | 17 ++++--- .../apache_beam/examples/snippets/snippets.py | 47 +++++++------------ .../examples/snippets/snippets_test.py | 38 ++++++++------- 4 files changed, 55 insertions(+), 65 deletions(-) diff --git a/examples/java8/src/main/java/org/apache/beam/examples/website_snippets/Snippets.java b/examples/java8/src/main/java/org/apache/beam/examples/website_snippets/Snippets.java index 6066fa49b4e7..db3cd90f9167 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/website_snippets/Snippets.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/website_snippets/Snippets.java @@ -21,7 +21,6 @@ import java.util.Collections; import java.util.List; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.join.CoGbkResult; @@ -61,17 +60,14 @@ public static PCollection coGroupByKeyTuple( Pipeline p, TupleTag emailsTag, TupleTag phonesTag, - List> emailList, - List> phoneList) { + PCollection> emails, + PCollection> phones) { // [START CoGroupByKeyTuple] - PCollection> emailsPColl = p.apply("create emails", Create.of(emailList)); - PCollection> phonesPColl = p.apply("create phones", Create.of(phoneList)); - PCollection> results = KeyedPCollectionTuple - .of(emailsTag, emailsPColl) - .and(phonesTag, phonesPColl) + .of(emailsTag, emails) + .and(phonesTag, phones) .apply(CoGroupByKey.create()); PCollection formattedResults = results.apply(ParDo.of( @@ -80,9 +76,9 @@ public static PCollection coGroupByKeyTuple( public void processElement(ProcessContext c) { KV e = c.element(); String name = e.getKey(); - Iterable emails = e.getValue().getAll(emailsTag); - Iterable phones = e.getValue().getAll(phonesTag); - String formattedResult = formatCoGbkResults(name, emails, phones); + Iterable emailsIter = e.getValue().getAll(emailsTag); + Iterable phonesIter = e.getValue().getAll(phonesTag); + String formattedResult = Snippets.formatCoGbkResults(name, emailsIter, phonesIter); c.output(formattedResult); } } diff --git a/examples/java8/src/test/java/org/apache/beam/examples/website_snippets/SnippetsTest.java b/examples/java8/src/test/java/org/apache/beam/examples/website_snippets/SnippetsTest.java index 65872d7d9f65..4260b7b0cc2b 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/website_snippets/SnippetsTest.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/website_snippets/SnippetsTest.java @@ -48,17 +48,20 @@ public class SnippetsTest implements Serializable { @Test public void testCoGroupByKeyTuple() throws IOException { // [START CoGroupByKeyTupleInputs] - final List> emailList = Arrays.asList( + final List> emailsList = Arrays.asList( KV.of("amy", "amy@example.com"), KV.of("carl", "carl@example.com"), KV.of("julia", "julia@example.com"), KV.of("carl", "carl@email.com")); - final List> phoneList = Arrays.asList( + final List> phonesList = Arrays.asList( KV.of("amy", "111-222-3333"), KV.of("james", "222-333-4444"), KV.of("amy", "333-444-5555"), KV.of("carl", "444-555-6666")); + + PCollection> emails = p.apply("CreateEmails", Create.of(emailsList)); + PCollection> phones = p.apply("CreatePhones", Create.of(phonesList)); // [END CoGroupByKeyTupleInputs] // [START CoGroupByKeyTupleOutputs] @@ -81,7 +84,7 @@ public void testCoGroupByKeyTuple() throws IOException { // [END CoGroupByKeyTupleOutputs] PCollection actualFormattedResults = - Snippets.coGroupByKeyTuple(p, emailsTag, phonesTag, emailList, phoneList); + Snippets.coGroupByKeyTuple(p, emailsTag, phonesTag, emails, phones); // [START CoGroupByKeyTupleFormattedOutputs] final List formattedResults = Arrays.asList( @@ -96,9 +99,9 @@ public void testCoGroupByKeyTuple() throws IOException { List expectedFormattedResultsList = new ArrayList(expectedResults.size()); for (KV e : expectedResults) { String name = e.getKey(); - Iterable emails = e.getValue().getAll(emailsTag); - Iterable phones = e.getValue().getAll(phonesTag); - String formattedResult = Snippets.formatCoGbkResults(name, emails, phones); + Iterable emailsIter = e.getValue().getAll(emailsTag); + Iterable phonesIter = e.getValue().getAll(phonesTag); + String formattedResult = Snippets.formatCoGbkResults(name, emailsIter, phonesIter); expectedFormattedResultsList.add(formattedResult); } PCollection expectedFormattedResultsPColl = @@ -106,6 +109,6 @@ public void testCoGroupByKeyTuple() throws IOException { PAssert.that(expectedFormattedResultsPColl).containsInAnyOrder(formattedResults); PAssert.that(actualFormattedResults).containsInAnyOrder(formattedResults); - p.run().waitUntilFinish(); + p.run(); } } diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index a7751a77d19f..093fc4fa0ffe 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -1149,39 +1149,24 @@ def count_ones(word_ones): | beam.io.WriteToText(output_path)) -def model_co_group_by_key_tuple(email_list, phone_list, output_path): +def model_co_group_by_key_tuple(emails, phones, output_path): """Applying a CoGroupByKey Transform to a tuple.""" import apache_beam as beam - with TestPipeline() as p: # Use TestPipeline for testing. - # [START model_group_by_key_cogroupbykey_tuple] - # Each data set is represented by key-value pairs in separate PCollections. - # Both data sets share a common key type (in this example str). - # The email_list contains values such as: ('joe', 'joe@example.com') with - # multiple possible values for each key. - # The phone_list contains values such as: ('mary': '111-222-3333') with - # multiple possible values for each key. - emails_pcoll = p | 'create emails' >> beam.Create(email_list) - phones_pcoll = p | 'create phones' >> beam.Create(phone_list) - - # The result PCollection contains one key-value element for each key in the - # input PCollections. The key of the pair will be the key from the input and - # the value will be a dictionary with two entries: 'emails' - an iterable of - # all values for the current key in the emails PCollection and 'phones': an - # iterable of all values for the current key in the phones PCollection. - # For instance, if 'emails' contained ('joe', 'joe@example.com') and - # ('joe', 'joe@gmail.com'), then 'result' will contain the element: - # ('joe', {'emails': ['joe@example.com', 'joe@gmail.com'], 'phones': ...}) - results = ({'emails': emails_pcoll, 'phones': phones_pcoll} - | beam.CoGroupByKey()) - - def join_info(name_info): - (name, info) = name_info - return '%s; %s; %s' %\ - (name, sorted(info['emails']), sorted(info['phones'])) - - contact_lines = results | beam.Map(join_info) - # [END model_group_by_key_cogroupbykey_tuple] - contact_lines | beam.io.WriteToText(output_path) + # [START model_group_by_key_cogroupbykey_tuple] + # The result PCollection contains one key-value element for each key in the + # input PCollections. The key of the pair will be the key from the input and + # the value will be a dictionary with two entries: 'emails' - an iterable of + # all values for the current key in the emails PCollection and 'phones': an + # iterable of all values for the current key in the phones PCollection. + results = ({'emails': emails, 'phones': phones} + | beam.CoGroupByKey()) + + formatted_results = results | beam.Map( + lambda (name, info):\ + '%s; %s; %s' %\ + (name, sorted(info['emails']), sorted(info['phones']))) + # [END model_group_by_key_cogroupbykey_tuple] + formatted_results | beam.io.WriteToText(output_path) def model_join_using_side_inputs( diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 8f88ab931793..6acc50ceb466 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -694,22 +694,28 @@ def test_model_group_by_key(self): self.assertEqual([str(s) for s in expected], self.get_output(result_path)) def test_model_co_group_by_key_tuple(self): - # [START model_group_by_key_cogroupbykey_tuple_inputs] - email_list = [ - ('amy', 'amy@example.com'), - ('carl', 'carl@example.com'), - ('julia', 'julia@example.com'), - ('carl', 'carl@email.com'), - ] - phone_list = [ - ('amy', '111-222-3333'), - ('james', '222-333-4444'), - ('amy', '333-444-5555'), - ('carl', '444-555-6666'), - ] - # [END model_group_by_key_cogroupbykey_tuple_inputs] - result_path = self.create_temp_file() - snippets.model_co_group_by_key_tuple(email_list, phone_list, result_path) + with TestPipeline() as p: + # [START model_group_by_key_cogroupbykey_tuple_inputs] + emails_list = [ + ('amy', 'amy@example.com'), + ('carl', 'carl@example.com'), + ('julia', 'julia@example.com'), + ('carl', 'carl@email.com'), + ] + phones_list = [ + ('amy', '111-222-3333'), + ('james', '222-333-4444'), + ('amy', '333-444-5555'), + ('carl', '444-555-6666'), + ] + + emails = p | 'CreateEmails' >> beam.Create(emails_list) + phones = p | 'CreatePhones' >> beam.Create(phones_list) + # [END model_group_by_key_cogroupbykey_tuple_inputs] + + result_path = self.create_temp_file() + snippets.model_co_group_by_key_tuple(p, emails, phones, result_path) + # [START model_group_by_key_cogroupbykey_tuple_outputs] results = [ ('amy', { From ef55198c872e094c0af1a7f6cf6fc5b3ad073468 Mon Sep 17 00:00:00 2001 From: David Cavazos Date: Thu, 12 Oct 2017 09:48:23 -0700 Subject: [PATCH 5/8] Removed 'p' from CoGroupByKey snippet --- .../org/apache/beam/examples/website_snippets/Snippets.java | 2 -- .../org/apache/beam/examples/website_snippets/SnippetsTest.java | 2 +- sdks/python/apache_beam/examples/snippets/snippets_test.py | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/examples/java8/src/main/java/org/apache/beam/examples/website_snippets/Snippets.java b/examples/java8/src/main/java/org/apache/beam/examples/website_snippets/Snippets.java index db3cd90f9167..fb1250585ed8 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/website_snippets/Snippets.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/website_snippets/Snippets.java @@ -20,7 +20,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.join.CoGbkResult; @@ -57,7 +56,6 @@ public static String formatCoGbkResults( } public static PCollection coGroupByKeyTuple( - Pipeline p, TupleTag emailsTag, TupleTag phonesTag, PCollection> emails, diff --git a/examples/java8/src/test/java/org/apache/beam/examples/website_snippets/SnippetsTest.java b/examples/java8/src/test/java/org/apache/beam/examples/website_snippets/SnippetsTest.java index 4260b7b0cc2b..3ca6c9a0f390 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/website_snippets/SnippetsTest.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/website_snippets/SnippetsTest.java @@ -84,7 +84,7 @@ public void testCoGroupByKeyTuple() throws IOException { // [END CoGroupByKeyTupleOutputs] PCollection actualFormattedResults = - Snippets.coGroupByKeyTuple(p, emailsTag, phonesTag, emails, phones); + Snippets.coGroupByKeyTuple(emailsTag, phonesTag, emails, phones); // [START CoGroupByKeyTupleFormattedOutputs] final List formattedResults = Arrays.asList( diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 6acc50ceb466..505858a43271 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -714,7 +714,7 @@ def test_model_co_group_by_key_tuple(self): # [END model_group_by_key_cogroupbykey_tuple_inputs] result_path = self.create_temp_file() - snippets.model_co_group_by_key_tuple(p, emails, phones, result_path) + snippets.model_co_group_by_key_tuple(emails, phones, result_path) # [START model_group_by_key_cogroupbykey_tuple_outputs] results = [ From 0d979a22cca8e8119ede64765328da7f916ce59c Mon Sep 17 00:00:00 2001 From: David Cavazos Date: Fri, 13 Oct 2017 13:14:51 -0700 Subject: [PATCH 6/8] Removed tuple parameter unpacking --- .../org/apache/beam/examples/website_snippets/Snippets.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/java8/src/main/java/org/apache/beam/examples/website_snippets/Snippets.java b/examples/java8/src/main/java/org/apache/beam/examples/website_snippets/Snippets.java index fb1250585ed8..f17171e9ecae 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/website_snippets/Snippets.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/website_snippets/Snippets.java @@ -68,7 +68,7 @@ public static PCollection coGroupByKeyTuple( .and(phonesTag, phones) .apply(CoGroupByKey.create()); - PCollection formattedResults = results.apply(ParDo.of( + PCollection contactLines = results.apply(ParDo.of( new DoFn, String>() { @ProcessElement public void processElement(ProcessContext c) { @@ -82,6 +82,6 @@ public void processElement(ProcessContext c) { } )); // [END CoGroupByKeyTuple] - return formattedResults; + return contactLines; } } From 2295a03038b3d850936d4267d0d9945ad7d3eb8f Mon Sep 17 00:00:00 2001 From: David Cavazos Date: Fri, 13 Oct 2017 13:15:09 -0700 Subject: [PATCH 7/8] Changed results name to match Python snippet --- .../python/apache_beam/examples/snippets/snippets.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 093fc4fa0ffe..f656c3f6dc4e 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -1161,12 +1161,14 @@ def model_co_group_by_key_tuple(emails, phones, output_path): results = ({'emails': emails, 'phones': phones} | beam.CoGroupByKey()) - formatted_results = results | beam.Map( - lambda (name, info):\ - '%s; %s; %s' %\ - (name, sorted(info['emails']), sorted(info['phones']))) + def join_info(name_info): + (name, info) = name_info + return '%s; %s; %s' %\ + (name, sorted(info['emails']), sorted(info['phones']))) + + contact_lines = results | beam.Map(join_info) # [END model_group_by_key_cogroupbykey_tuple] - formatted_results | beam.io.WriteToText(output_path) + contact_lines | beam.io.WriteToText(output_path) def model_join_using_side_inputs( From 3fb4210579a7d20f9cfe49b5dbad0cf8ffac4208 Mon Sep 17 00:00:00 2001 From: David Cavazos Date: Fri, 13 Oct 2017 14:13:01 -0700 Subject: [PATCH 8/8] Removed extra parenthesis --- sdks/python/apache_beam/examples/snippets/snippets.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index f656c3f6dc4e..6cc96efe79d9 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -1164,7 +1164,7 @@ def model_co_group_by_key_tuple(emails, phones, output_path): def join_info(name_info): (name, info) = name_info return '%s; %s; %s' %\ - (name, sorted(info['emails']), sorted(info['phones']))) + (name, sorted(info['emails']), sorted(info['phones'])) contact_lines = results | beam.Map(join_info) # [END model_group_by_key_cogroupbykey_tuple]