# Structure of Beam Code

* Create Pipeline Object
* Input Data
* Transform data
* Output Data
* Display Data

# First Beam Pipeline

In [1]:
import apache_beam as beam

In [2]:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner

In [3]:
p1 = beam.Pipeline(InteractiveRunner())

In [4]:
type(p1)

apache_beam.pipeline.Pipeline

In [5]:
from apache_beam import Create, Map

In [6]:
findcube = (p1 
        | "Create Element" >> Create(range(10))
        | "Find Cube" >> Map(lambda x : x**3)
        )

In [7]:
findcube

<PCollection[[6]: Find Cube.None] at 0x7f847ea23790>

In [8]:
p1.run()



<apache_beam.runners.interactive.interactive_runner.PipelineResult at 0x7f847e8da250>

In [9]:
print(findcube)

PCollection[[6]: Find Cube.None]


In [10]:
import apache_beam.runners.interactive.interactive_beam as ib

In [11]:
#ib.show_graph(p1)

In [12]:
ib.show(findcube)

# Combine All Code

In [13]:
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam import Create, Map

p1 = beam.Pipeline(InteractiveRunner())
findcube = (p1 
        | "Create Element" >> Create(range(10))
        | "Find Cube" >> Map(lambda x : x**3)
        )

p1.run()
ib.show(findcube)



In [14]:
import apache_beam as beam
from apache_beam import 


Create, Map

p1 = beam.Pipeline()
findcube = (p1 
        | "Create Element" >> Create(range(10))
        | "Find Cube" >> Map(lambda x : x**3)
        | "Print" >> Map(print)
        )

p1.run()



0
1
8
27
64
125
216
343
512
729


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f8497f32190>

# Create

In [15]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()
findcube = (p1 
        | "Create Element" >> Create([1,5,8])
        | "Find Cube" >> Map(lambda x : x**3)
        | "Print" >> Map(print)
        )

p1.run()



1
125
512


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f847c4921d0>

In [17]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()
findcube = (p1 
        | "Create Element" >> Create(["hello", "world"])
        | "Find Cube" >> Map(lambda x : x.upper())
        | "Print" >> Map(print)
        )

p1.run()



HELLO
WORLD


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f847c169210>

In [20]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()
findcube = (p1 
        | "Create Element" >> Create( {  "foo" : "bar" , "hello" : "world" })
        | "Find Cube" >> Map(lambda x : x[1])
        | "Print" >> Map(print)
        )

p1.run()



bar
world


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f8497e8b650>

# Flatten

In [23]:
import apache_beam as beam

p1 = beam.Pipeline()

even = {2,4,6,8}
odd = {1,3,5,7,9}

even_p1 = p1 | "Even No" >> beam.Create(even)
odd_p1 = p1 | "odd No" >> beam.Create(odd)

flat_out =  (even_p1, odd_p1) | beam.Flatten() | beam.Map(print)

p1.run()



1
3
5
7
9
8
2
4
6


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f84655e8490>

# Map/FlatMap

In [24]:
import apache_beam as beam
from apache_beam import Create, Map

def findcube(element):
    return element ** 3

p1 = beam.Pipeline()
findcube = (p1 
        | "Create Element" >> Create([1,5,8])
        | "Find Cube" >> Map(findcube)
        | "Print" >> Map(print)
        )

p1.run()



1
125
512


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f846563c310>

In [26]:
import apache_beam as beam
from apache_beam import Create, Map, FlatMap

p1 = beam.Pipeline()
findcube = (p1 
        | "Create Element" >> Create(["foo bar", "hello world"])
        | "String spliting" >> FlatMap(str.split)
        | "Print" >> Map(print)
        )

p1.run()



foo
bar
hello
world


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f8464efca10>

# Filter

In [29]:
import apache_beam as beam
from apache_beam import Create, Filter

p1 = beam.Pipeline()
filter_even = (p1 
        | "Create Element" >> Create(range(20))
        | "Filter Even" >> Filter(lambda x : x % 2 == 0)
        | "Print" >> Map(print)
        )

p1.run()



0
2
4
6
8
10
12
14
16
18


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f8465765390>

In [32]:
import apache_beam as beam
from apache_beam import Create, Filter

p1 = beam.Pipeline()
filter_even = (p1 
        | "Create Element" >> Create(["Hello" , "World", "Hey"])
        | "Filter Even" >> Filter(lambda x : x.startswith("W"))
        | "Print" >> Map(print)
        )

p1.run()



World


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f8465269190>

# ParDo

In [34]:
import apache_beam as beam
from apache_beam import Create, Map, ParDo, Filter

p1 = beam.Pipeline()
findcube = (p1 
        | "Create Element" >> Create(["foo bar", "hello world"])
        | "Filter Startwith f" >> Filter(lambda x : x.startswith("f"))
        | "String spliting" >> ParDo(str.split)
        | "Print" >> Map(print)
        )

p1.run()



foo
bar


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f8465118190>

# keys, Values, String, Kvswap

In [40]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()
findcube = (p1 
        | "Create Element" >> Create( {  "foo" : "bar" , "hello" : "world" })
        #| "All Keys" >> beam.Keys()
        #| "All Values" >> beam.Values()
        #| "String Display" >> beam.ToString.Element()  #Kvs()    
        | "kv swap" >> beam.KvSwap() 
        | "Print" >> Map(print)
        )

p1.run()



bar
world


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f8465a74910>

# Partition

In [41]:
%pip install sympy

/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
Note: you may need to restart the kernel to use updated packages.


In [42]:
import sympy

In [44]:
sympy.isprime(7)

True

In [54]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()

def is_prime_function(element, no_par):
    return 1 if sympy.isprime(element) else 0

is_prime = (p1 
        | "Create Element" >> Create(range(1,20))
        | "Partition" >> beam.Partition(is_prime_function , 2)
        )

#is_prime[1] | "Print Prime Number" >> Map(print)

is_prime[0] | "Print Non Prime Number" >> Map(print)

p1.run()




1
4
6
8
9
10
12
14
15
16
18


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f845baed690>

# Regex

In [56]:
import apache_beam as beam
from apache_beam import Create, Map, ParDo, Filter

p1 = beam.Pipeline()
findcube = (p1 
        | "Create Element" >> Create(["1", "23", "hello", "world"])
        | "Regex" >> beam.Regex.matches("[a-z]+")  #("(\d+)")
        | "Print" >> Map(print)
        )

p1.run()



hello
world


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f845b8de050>

# Aggregation

In [57]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()
findcube = (p1 
        | "Create Element" >> Create(["1", "23", "hello", "world"])
        | "Count" >> beam.combiners.Count.Globally()
        | "Print" >> Map(print)
        )

p1.run()



4


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f845b90fd10>

In [59]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()
findcube = (p1 
        #| "Create Element" >> Create([("one+one", [1,1]) , ("one+three", [1,3])])
        | "Create Element" >> Create({"one+one" : [1,1], "one+three": [1,3] })
        | "Count" >> beam.CombineValues(sum)
        | "Print" >> Map(print)
        )

p1.run()



('one+one', 2)
('one+three', 4)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f845b984bd0>

In [60]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()
findcube = (p1 
        | "Create Element" >> Create([1,2,3,4,5,3,2,1])
        | "Count" >> beam.Distinct()
        | "Print" >> Map(print)
        )

p1.run()



1
2
3
4
5


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f845b98afd0>

In [62]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()
findcube = (p1 
        | "Create Element" >> Create(range(20))
        | "Count" >> beam.combiners.Sample.FixedSizeGlobally(5)
        | "Print" >> Map(print)
        )

p1.run()



[19, 8, 5, 15, 7]


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f84653806d0>

In [66]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()
out = (p1 
        | "Create Element" >> Create(range(10))
        #| "Sum" >> beam.CombineGlobally(sum)
        | "Max" >> beam.CombineGlobally(max)
        | "Print" >> Map(print)
        )

p1.run()



9


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f845b86c5d0>

In [68]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()
out = (p1 
        | "Create Element" >> Create([("hello", 1), ("hello" , 4), ("bye" , 7)])
        | "Mean of perkey" >> beam.combiners.Mean.PerKey()
        | "Print" >> Map(print)
        )

p1.run()



('hello', 2.5)
('bye', 7.0)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f846529c210>

In [70]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()
out = (p1 
        | "Create Element" >> Create(range(10))
        #| "Top 2" >> beam.combiners.Top.Largest(2)
        | "Small 2" >> beam.combiners.Top.Smallest(2)
        | "Print" >> Map(print)
        )

p1.run()



[0, 1]


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f845b84f6d0>

In [71]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()
out = (p1 
        | "Create Element" >> Create([("hello", 1), ("hello" , 4), ("bye" , 7)])
        | "Top 1 Per key" >> beam.combiners.Top.LargestPerKey(1)
        | "Print" >> Map(print)
        )

p1.run()



('hello', [4])
('bye', [7])


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f845b349390>

In [72]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()
out = (p1 
        | "Create Element" >> Create([("hello", 1), ("hello" , 4), ("bye" , 7)])
        | "Sum perkey basis" >> beam.CombinePerKey(sum)
        | "Print" >> Map(print)
        )

p1.run()



('hello', 5)
('bye', 7)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f845b2440d0>