<a href="https://colab.research.google.com/github/YoheiShinozaki/BeamKatasColab/blob/master/Beam_Katas_06_Core_Transforms_CoGroupByKey.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Beam Katas on Colab

<!--
  ~  Licensed to the Apache Software Foundation (ASF) under one
  ~  or more contributor license agreements.  See the NOTICE file
  ~  distributed with this work for additional information
  ~  regarding copyright ownership.  The ASF licenses this file
  ~  to you under the Apache License, Version 2.0 (the
  ~  "License"); you may not use this file except in compliance
  ~  with the License.  You may obtain a copy of the License at
  ~
  ~      http://www.apache.org/licenses/LICENSE-2.0
  ~
  ~  Unless required by applicable law or agreed to in writing, software
  ~  distributed under the License is distributed on an "AS IS" BASIS,
  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  ~  See the License for the specific language governing permissions and
  ~  limitations under the License.
  -->

<html>
<h2>CoGroupByKey</h2>
<p>
  CoGroupByKey performs a relational join of two or more key/value PCollections that have the same
  key type.
</p>
<p>
  <b>Kata:</b> Implement a
  <a href="https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.CoGroupByKey">
    CoGroupByKey</a> transform that join words by its first alphabetical letter, and then produces
  the string representation of the WordsAlphabet model.
</p>
<br>
<div class="hint">
  Refer to
  <a href="https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.CoGroupByKey">
    CoGroupByKey</a>to solve this problem.
</div>
<div class="hint">
  Refer to the Beam Programming Guide
  <a href="https://beam.apache.org/documentation/programming-guide/#cogroupbykey">
    "CoGroupByKey"</a> section for more information.
</div>
</html>

In [0]:
!pip install apache-beam -qqq

import apache_beam as beam
from apache_beam.runners.interactive import interactive_runner

## Python Collection

In [0]:
fruits = ['apple', 'banana', 'cherry'] | beam.Map(lambda word: (word[0], word))
countries = ['australia', 'brazil', 'canada'] | beam.Map(lambda word: (word[0], word))

In [0]:
fruits

[(a, apple), (b, banana), (c, cherry)]

In [0]:
countries

[(a, australia), (b, brazil), (c, canada)]

In [0]:
(fruits, countries) | beam.CoGroupByKey()

[(a, ([apple], [australia])),
 (c, ([cherry], [canada])),
 (b, ([banana], [brazil]))]

## Beam Pcollection

In [0]:
p = beam.Pipeline(interactive_runner.InteractiveRunner())

fruits = (p | 'Fruits' >> beam.Create(['apple', 'banana', 'cherry'])
            | 'Fruit to KV' >> beam.Map(lambda word: (word[0], word)))
  
countries = (p | 'Countries' >> beam.Create(['australia', 'brazil', 'canada'])
               | 'Country to KV' >> beam.Map(lambda word: (word[0], word)))
  
(fruits, countries) | beam.CoGroupByKey()

p.run()

Running...

Using 0 cached PCollections
Executing 5 of 5 transforms.

Fruits produced {'banana', 'cherry', 'apple'}

Country to KV produced {('c', 'canada'), ('b', 'brazil'), ('a', 'australia')}

Countries produced {'brazil', 'canada', 'australia'}

Fruit to KV produced {('a', 'apple'), ('b', 'banana'), ('c', 'cherry')}

CoGroupByKey produced {('c', (['cherry'], ['canada'])), ('a', (['apple'], ['australia'])), ('b', (['banana'], ['brazil']))}

<apache_beam.runners.interactive.interactive_runner.PipelineResult at 0x7f1caf1a41d0>

### WordsAlphabet model

In [0]:
class WordsAlphabet:

    def __init__(self, alphabet, fruit, country):
        self.alphabet = alphabet
        self.fruit = fruit
        self.country = country

    def __str__(self):
        return "WordsAlphabet(alphabet:'%s', fruit='%s', country='%s')" % (self.alphabet, self.fruit, self.country)

In [0]:
def apply_transforms(fruits, countries):
    def map_to_alphabet_kv(word):
        return (word[0], word)

    def cogbk_result_to_wordsalphabet(cgbk_result):
        (alphabet, words) = cgbk_result
        return WordsAlphabet(alphabet, words['fruits'][0], words['countries'][0])

    fruits_kv = (fruits | 'Fruit to KV' >> beam.Map(map_to_alphabet_kv))
    countries_kv = (countries | 'Country to KV' >> beam.Map(map_to_alphabet_kv))

    return ({'fruits': fruits_kv, 'countries': countries_kv}
            | 'CoGroupByKey' >> beam.CoGroupByKey()
            | 'Map' >> beam.Map(cogbk_result_to_wordsalphabet))


In [0]:
p = beam.Pipeline(interactive_runner.InteractiveRunner())

fruits = p | 'Fruits' >> beam.Create(['apple', 'banana', 'cherry'])
countries = p | 'Countries' >> beam.Create(['australia', 'brazil', 'canada'])

(apply_transforms(fruits, countries))

p.run()

Running...

Using 0 cached PCollections
Executing 6 of 6 transforms.

Country to KV produced {('c', 'canada'), ('b', 'brazil'), ('a', 'australia')}

Fruit to KV produced {('c', 'cherry'), ('b', 'banana'), ('a', 'apple')}

Countries produced {'canada', 'australia', 'brazil'}

Fruits produced {'apple', 'cherry', 'banana'}

Map produced {<__main__.WordsAlphabet instance at..., <__main__.WordsAlphabet instance at..., <__main__.WordsAlphabet instance at...}

CoGroupByKey produced {('c', {'countries': ['canada'], 'fr..., ('b', {'countries': ['brazil'], 'fr..., ('a', {'countries': ['australia'], ...}

<apache_beam.runners.interactive.interactive_runner.PipelineResult at 0x7f1cade06450>