Skip to content

Commit

Permalink
Add LogElements as a Beam PTransform (#23879)
Browse files Browse the repository at this point in the history
  • Loading branch information
nivaldoh committed Nov 1, 2022
1 parent 0fbe0a7 commit 3371a1c
Show file tree
Hide file tree
Showing 35 changed files with 120 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@

import apache_beam as beam

from log_elements import LogElements

with beam.Pipeline() as p:

(p | beam.Create(range(1, 11))
| beam.combiners.Count.Globally()
| LogElements())
| beam.LogElements())
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@

import apache_beam as beam

from log_elements import LogElements

with beam.Pipeline() as p:

(p | beam.Create(range(1, 11))
| beam.combiners.Top.Largest(2)
| LogElements())
| beam.LogElements())
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@

import apache_beam as beam

from log_elements import LogElements

with beam.Pipeline() as p:

(p | beam.Create(range(1, 11))
| beam.combiners.Mean.Globally()
| LogElements())
| beam.LogElements())
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@

import apache_beam as beam

from log_elements import LogElements

with beam.Pipeline() as p:

(p | beam.Create(range(1, 11))
| beam.combiners.Top.Smallest(1)
| LogElements())
| beam.LogElements())
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@

import apache_beam as beam

from log_elements import LogElements

with beam.Pipeline() as p:

(p | beam.Create(range(1, 11))
| beam.CombineGlobally(sum)
| LogElements())
| beam.LogElements())
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@

import apache_beam as beam

from log_elements import LogElements

with beam.Pipeline() as p:

(p | beam.Create(range(1, 11))
| beam.Filter(lambda num: num % 2 == 0)
| LogElements())
| beam.LogElements())
4 changes: 1 addition & 3 deletions learning/katas/python/Common Transforms/Filter/ParDo/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@

import apache_beam as beam

from log_elements import LogElements


class FilterOutEvenNumber(beam.DoFn):

Expand All @@ -41,4 +39,4 @@ def process(self, element):
with beam.Pipeline() as p:
(p | beam.Create(range(1, 11))
| beam.ParDo(FilterOutEvenNumber())
| LogElements())
| beam.LogElements())
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@

import apache_beam as beam

from log_elements import LogElements

with beam.Pipeline() as p:

(p | beam.Create(['apple', 'banana', 'cherry', 'durian', 'guava', 'melon'])
| beam.WithKeys(lambda word: word[0:1])
| LogElements())
| beam.LogElements())
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,12 @@

import apache_beam as beam

from log_elements import LogElements

with beam.Pipeline() as p:

numbers = p | beam.Create([1, 2, 3, 4, 5])

mult5_results = numbers | beam.Map(lambda num: num * 5)
mult10_results = numbers | beam.Map(lambda num: num * 10)

mult5_results | 'Log multiply 5' >> LogElements(prefix='Multiplied by 5: ')
mult10_results | 'Log multiply 10' >> LogElements(prefix='Multiplied by 10: ')
mult5_results | 'Log multiply 5' >> beam.LogElements(prefix='Multiplied by 5: ')
mult10_results | 'Log multiply 10' >> beam.LogElements(prefix='Multiplied by 10: ')
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@

import apache_beam as beam

from log_elements import LogElements


class WordsAlphabet:

Expand Down Expand Up @@ -67,4 +65,4 @@ def cogbk_result_to_wordsalphabet(cgbk_result):
countries = p | 'Countries' >> beam.Create(['australia', 'brazil', 'canada'])

(apply_transforms(fruits, countries)
| LogElements())
| beam.LogElements())
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@

import apache_beam as beam

from log_elements import LogElements

PLAYER_1 = 'Player 1'
PLAYER_2 = 'Player 2'
PLAYER_3 = 'Player 3'
Expand All @@ -41,4 +39,4 @@
(p | beam.Create([(PLAYER_1, 15), (PLAYER_2, 10), (PLAYER_1, 100),
(PLAYER_3, 25), (PLAYER_2, 75)])
| beam.CombinePerKey(sum)
| LogElements())
| beam.LogElements())
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@

import apache_beam as beam

from log_elements import LogElements


class AverageFn(beam.CombineFn):

Expand All @@ -53,4 +51,4 @@ def extract_output(self, accumulator):

(p | beam.Create([10, 20, 50, 70, 90])
| beam.CombineGlobally(AverageFn())
| LogElements())
| beam.LogElements())
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@

import apache_beam as beam

from log_elements import LogElements


def sum(numbers):
total = 0
Expand All @@ -45,4 +43,4 @@ def sum(numbers):

(p | beam.Create([1, 2, 3, 4, 5])
| beam.CombineGlobally(sum)
| LogElements())
| beam.LogElements())
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@

import apache_beam as beam

from log_elements import LogElements


class ExtractAndMultiplyNumbers(beam.PTransform):

Expand All @@ -46,4 +44,4 @@ def expand(self, pcoll):

(p | beam.Create(['1,2,3,4,5', '6,7,8,9,10'])
| ExtractAndMultiplyNumbers()
| LogElements())
| beam.LogElements())
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@

import apache_beam as beam

from log_elements import LogElements

with beam.Pipeline() as p:

wordsStartingWithA = \
Expand All @@ -40,4 +38,4 @@

((wordsStartingWithA, wordsStartingWithB)
| beam.Flatten()
| LogElements())
| beam.LogElements())
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,9 @@

import apache_beam as beam

from log_elements import LogElements

with beam.Pipeline() as p:

(p | beam.Create(['apple', 'ball', 'car', 'bear', 'cheetah', 'ant'])
| beam.Map(lambda word: (word[0], word))
| beam.GroupByKey()
| LogElements())
| beam.LogElements())
4 changes: 1 addition & 3 deletions learning/katas/python/Core Transforms/Map/FlatMap/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@

import apache_beam as beam

from log_elements import LogElements

with beam.Pipeline() as p:

(p | beam.Create(['Apache Beam', 'Unified Batch and Streaming'])
| beam.FlatMap(lambda sentence: sentence.split())
| LogElements())
| beam.LogElements())
4 changes: 1 addition & 3 deletions learning/katas/python/Core Transforms/Map/Map/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@

import apache_beam as beam

from log_elements import LogElements

with beam.Pipeline() as p:

(p | beam.Create([10, 20, 30, 40, 50])
| beam.Map(lambda num: num * 5)
| LogElements())
| beam.LogElements())
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@

import apache_beam as beam

from log_elements import LogElements


class BreakIntoWordsDoFn(beam.DoFn):

Expand All @@ -43,5 +41,5 @@ def process(self, element):

(p | beam.Create(['Hello Beam', 'It is awesome'])
| beam.ParDo(BreakIntoWordsDoFn())
| LogElements())
| beam.LogElements())

4 changes: 1 addition & 3 deletions learning/katas/python/Core Transforms/Map/ParDo/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@

import apache_beam as beam

from log_elements import LogElements


class MultiplyByTenDoFn(beam.DoFn):

Expand All @@ -41,5 +39,5 @@ def process(self, element):

(p | beam.Create([1, 2, 3, 4, 5])
| beam.ParDo(MultiplyByTenDoFn())
| LogElements())
| beam.LogElements())

Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@

import apache_beam as beam

from log_elements import LogElements


def partition_fn(number, num_partitions):
if number > 100:
Expand All @@ -45,5 +43,5 @@ def partition_fn(number, num_partitions):
(p | beam.Create([1, 2, 3, 4, 5, 100, 110, 150, 250])
| beam.Partition(partition_fn, 2))

results[0] | 'Log numbers > 100' >> LogElements(prefix='Number > 100: ')
results[1] | 'Log numbers <= 100' >> LogElements(prefix='Number <= 100: ')
results[0] | 'Log numbers > 100' >> beam.LogElements(prefix='Number > 100: ')
results[1] | 'Log numbers <= 100' >> beam.LogElements(prefix='Number <= 100: ')
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@

import apache_beam as beam

from log_elements import LogElements


class Person:
def __init__(self, name, city, country=''):
Expand Down Expand Up @@ -64,4 +62,4 @@ def process(self, element, cities_to_countries):

(p | beam.Create(persons)
| beam.ParDo(EnrichCountryDoFn(), beam.pvalue.AsDict(cities_to_countries))
| LogElements())
| beam.LogElements())
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import apache_beam as beam
from apache_beam import pvalue

from log_elements import LogElements

num_below_100_tag = 'num_below_100'
num_above_100_tag = 'num_above_100'

Expand All @@ -54,5 +52,5 @@ def process(self, element):
| beam.ParDo(ProcessNumbersDoFn())
.with_outputs(num_above_100_tag, main=num_below_100_tag))

results[num_below_100_tag] | 'Log numbers <= 100' >> LogElements(prefix='Number <= 100: ')
results[num_above_100_tag] | 'Log numbers > 100' >> LogElements(prefix='Number > 100: ')
results[num_below_100_tag] | 'Log numbers <= 100' >> beam.LogElements(prefix='Number <= 100: ')
results[num_above_100_tag] | 'Log numbers > 100' >> beam.LogElements(prefix='Number > 100: ')
4 changes: 1 addition & 3 deletions learning/katas/python/Examples/Word Count/Word Count/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@

import apache_beam as beam

from log_elements import LogElements

lines = [
"apple orange grape banana apple banana",
"banana orange banana papaya"
Expand All @@ -44,4 +42,4 @@
| beam.FlatMap(lambda sentence: sentence.split())
| beam.combiners.Count.PerElement()
| beam.MapTuple(lambda k, v: k + ":" + str(v))
| LogElements())
| beam.LogElements())
4 changes: 1 addition & 3 deletions learning/katas/python/IO/TextIO/ReadFromText/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@

import apache_beam as beam

from log_elements import LogElements

with beam.Pipeline() as p:

file_path = 'countries.txt'

(p | beam.io.ReadFromText(file_path) | beam.Map(lambda country: country.upper())
| LogElements())
| beam.LogElements())
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@

import apache_beam as beam

from log_elements import LogElements

with beam.Pipeline() as p:

(p | beam.Create(['Hello Beam'])
| LogElements())
| beam.LogElements())

Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import apache_beam as beam
from apache_beam.transforms import window

from log_elements import LogElements


class Event:
def __init__(self, id, event, timestamp):
Expand All @@ -60,5 +58,5 @@ def process(self, element, **kwargs):
Event('5', 'book-order', datetime.datetime(2020, 3, 8, 0, 0, 0, 0, tzinfo=pytz.UTC)),
])
| beam.ParDo(AddTimestampDoFn())
| LogElements(with_timestamp=True))
| beam.LogElements(with_timestamp=True))

Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from apache_beam.transforms.trigger import AfterCount
from apache_beam.transforms.trigger import AccumulationMode
from apache_beam.utils.timestamp import Duration
from log_elements import LogElements
from apache_beam.transforms.util import LogElements


class CountEventsWithEarlyTrigger(beam.PTransform):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from apache_beam.transforms.trigger import AccumulationMode
from apache_beam.transforms.trigger import AfterWatermark
from apache_beam.utils.timestamp import Duration
from log_elements import LogElements
from apache_beam.transforms.util import LogElements


class CountEvents(beam.PTransform):
Expand Down

0 comments on commit 3371a1c

Please sign in to comment.