In [26]:
!pip install apache-beam



**Aggregation**

In [27]:
import apache_beam as beam
from apache_beam import Create,Map,Regex

p1=beam.Pipeline()

agg_var=(p1
            | "Create Element" >> Create(["1","2","2","Hello","welcome"]))


agg_var|"Global Count" >> beam.combiners.Count.Globally()|"Print" >> Map(print)
agg_var|"per key count" >> beam.combiners.Count.PerElement()|"Print1" >> Map(print)
p1.run()

('1', 1)
('2', 2)
('Hello', 1)
('welcome', 1)
5


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7a6818cde290>

**Combine values function**

In [28]:
import apache_beam as beam
from apache_beam import Create,Map,Regex

p1=beam.Pipeline()

agg_var=(p1
            | "Create Element" >> Create([("one+one",[1,1]),("one+three",[1,3])]))

#can also work with dictionary
agg_var|"combine values" >> beam.CombineValues(sum )|"Print" >> Map(print) #instead of sum u can give used define funtion ---> same as combine.Fn
p1.run()

('one+one', 2)
('one+three', 4)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7a6818b66590>

**distinct**

In [29]:
import apache_beam as beam
from apache_beam import Create,Map,Regex

p1=beam.Pipeline()

agg_var=(p1
            | "Create Element" >> Create([1,2,3,4,5,4,3,2,1]))


agg_var|"get distinct" >> beam.Distinct()|"Print" >> Map(print)
p1.run()

1
2
3
4
5


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7a68138bb850>

**Sample Size**

In [30]:
import apache_beam as beam
from apache_beam import Create,Map,Regex

p1=beam.Pipeline()

agg_var=(p1
            | "Create Element" >> Create(range(20)))


agg_var|"get distinct" >> beam.combiners.Sample.FixedSizeGlobally(5)|"Print" >> Map(print)
p1.run()

[1, 3, 13, 8, 10]


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7a6813b82350>

**Combine Globally**

In [31]:
import apache_beam as beam
from apache_beam import Create,Map,Regex

p1=beam.Pipeline()

agg_var=(p1
            | "Create Element" >> Create(range(20)))

#can also work with dictionary
agg_var|"get distinct" >> beam.CombineGlobally(sum)|"Print" >> Map(print) # Can pass many function like min max and also user defined functions
p1.run()

190


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7a681382c650>

**combine per key**

In [35]:
import apache_beam as beam

# Define a custom CombineFn to concatenate strings
class ConcatStringsFn(beam.CombineFn):
    def create_accumulator(self):
        return ""  # Start with an empty string

    def add_input(self, accumulator, element):
        return accumulator + " " + element if accumulator else element  # Concatenate with space

    def merge_accumulators(self, accumulators):
        return " ".join(filter(None, accumulators))  # Merge multiple accumulators

    def extract_output(self, accumulator):
        return accumulator  # Return final concatenated string

# Apache Beam Pipeline
with beam.Pipeline() as p:
    data = p | "Create PCollection" >> beam.Create([
        ("A", "apple"),
        ("A", "banana"),
        ("B", "cherry"),
        ("B", "date"),
        ("B", "fig"),
    ])

    # Apply CombinePerKey with our custom function
    result = data | "Concatenate Strings" >> beam.CombinePerKey(ConcatStringsFn())

    # Print results
    result | "Print Output" >> beam.Map(print)


('A', 'apple banana')
('B', 'cherry date fig')


**combine on basis of tuple like group by**

In [32]:
import apache_beam as beam
from apache_beam import Create,Map,Regex

p1=beam.Pipeline()

agg_var=(p1
            | "Create Element" >> Create([("hello",1),("hello",4),("hi",1)]))

#can also work with dictionary
agg_var|"like group by" >> beam.combiners.Mean.PerKey()|"Print" >> Map(print) # Can pass many function like min max and also user defined functions
p1.run()

('hello', 2.5)
('hi', 1.0)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7a681382ec10>

In [33]:
import apache_beam as beam

class MaxValueFn(beam.CombineFn):
    def init_accumulator(self):
        return float('-inf')  # Initialize accumulator with smallest value

    def update_accumulator(self, max_value, element):
        return max(max_value, element)  # Keep the highest value ----> here we add or update value is accumulator element wise for eg if mean--> sum,count=sum_count#(Accumulator)
                                                                  #Return type is also same as accumulator== return sum+element , count +1

    def merge_accumulators(self, accumulators):
        return max(accumulators)  # Merge by selecting the max value --Here we can have merging logic for eg for mean= sum/count

    def get_final_result(self, max_value):
        return max_value  # Output the max value ---> retrun accumulator

# Rename methods (they should still follow the same logic)
MaxValueFn.create_accumulator = MaxValueFn.init_accumulator
MaxValueFn.add_input = MaxValueFn.update_accumulator
MaxValueFn.extract_output = MaxValueFn.get_final_result

# Running the pipeline
with beam.Pipeline() as p:
    numbers = p | beam.Create([5, 15, 25, 10, 30])

    max_result = numbers | beam.CombineGlobally(MaxValueFn())

    max_result | beam.Map(print)  # Expected output: 30


30


**CogroupKey**

In [34]:
#this logic can be used for joining

emails_list = [
    ('amy', 'amy@example.com'),
    ('carl', 'carl@example.com'),
    ('julia', 'julia@example.com'),
    ('carl', 'carl@email.com'),
]
phones_list = [
    ('amy', '111-222-3333'),
    ('james', '222-333-4444'),
    ('amy', '333-444-5555'),
    ('carl', '444-555-6666'),
]

emails = p | 'CreateEmails' >> beam.Create(emails_list)
phones = p | 'CreatePhones' >> beam.Create(phones_list)

results = ({'emails': emails, 'phones': phones} | beam.CoGroupByKey())|Map(print)
p.run()

30
('amy', {'emails': ['amy@example.com'], 'phones': ['111-222-3333', '333-444-5555']})
('carl', {'emails': ['carl@example.com', 'carl@email.com'], 'phones': ['444-555-6666']})
('julia', {'emails': ['julia@example.com'], 'phones': []})
('james', {'emails': [], 'phones': ['222-333-4444']})


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7a681380a810>

**Some other combine functions**

#Top Largest

In [44]:
import apache_beam as beam
from apache_beam import Create,Map,Regex

p1=beam.Pipeline()

number=p1|beam.Create(range(10))

number|beam.combiners.Top.Largest(2)|beam.Map(print)
p1.run()



[9, 8]


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7a6812f58410>

# Top Smallest

In [45]:
import apache_beam as beam
from apache_beam import Create,Map,Regex

p1=beam.Pipeline()

number=p1|beam.Create(range(10))

number|beam.combiners.Top.Smallest(2)|beam.Map(print)
p1.run()



[0, 1]


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7a6813a058d0>

#Group By Batch

In [46]:

def groupintobatches(test=None):
  # [START groupintobatches]
  import apache_beam as beam

  with beam.Pipeline() as pipeline:
    batches_with_keys = (
        pipeline
        | 'Create produce' >> beam.Create([
            ('spring', '🍓'),
            ('spring', '🥕'),
            ('spring', '🍆'),
            ('spring', '🍅'),
            ('summer', '🥕'),
            ('summer', '🍅'),
            ('summer', '🌽'),
            ('fall', '🥕'),
            ('fall', '🍅'),
            ('winter', '🍆'),
        ])
        | 'Group into batches' >> beam.GroupIntoBatches(3)
        | beam.Map(print))

  pipeline.run()

groupintobatches()



('spring', ['🍓', '🥕', '🍆'])
('summer', ['🥕', '🍅', '🌽'])
('spring', ['🍅'])
('fall', ['🥕', '🍅'])
('winter', ['🍆'])


#Largest per key

In [52]:
import apache_beam as beam
from apache_beam import Create

p1=beam.Pipeline()

numbers=p1|beam.Create([("Hello",1),("Hello",3),("hi",1)])

numbers|beam.combiners.Top.LargestPerKey(2)|beam.Map(print)
p1.run()

('Hello', [3, 1])
('hi', [1])


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7a6812e12f50>

#Combine per key

In [53]:
import apache_beam as beam
from apache_beam import Create

p1=beam.Pipeline()

numbers=p1|beam.Create([("Hello",1),("Hello",3),("hi",1)])

numbers|beam.CombinePerKey(sum)|beam.Map(print)
p1.run()

('Hello', 4)
('hi', 1)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7a681292f610>