<a href="https://colab.research.google.com/github/HemanthhVV/Apache-Beam/blob/main/Beam_NoteBook_II.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install apache-beam

In [None]:
import apache_beam as beam

## ***Filter***



In [None]:
def is_perennial(plant):
  return plant['duration'] == 'perennial'

with beam.Pipeline() as p:
  perennials = (
      p | 'Gardening plants' >> beam.Create([
          {
              'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'
          },
          {
              'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'
          },
          {
              'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'
          },
          {
              'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'
          },
          {
              'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'
          },
      ])
      | 'Filter perennials' >> beam.Filter(is_perennial)
      | beam.Map(print))




{'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'}
{'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'}
{'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'}


In [None]:
#Similarly with the anonymous function
with beam.Pipeline() as p:
  perennials = (
      p | 'Gardening plants' >> beam.Create([
          {
              'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'
          },
          {
              'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'
          },
          {
              'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'
          },
          {
              'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'
          },
          {
              'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'
          },
      ])
      | 'Filter perennials' >>
      beam.Filter(lambda plant: plant['duration'] == 'perennial')
      | beam.Map(print))

{'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'}
{'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'}
{'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'}


### Usually note tht the Filter Func takes the function or anonymous line as it's first argument and then it takes the variables that will be used for the UDF/anonymous function, as we can see in the above example

In [None]:
#Multiple args Filtering
def FilteringWithMultiArgs(plant,duration):
    return plant['duration'] == duration

with beam.Pipeline() as p:
  perennials = (
      p | 'Gardening plants' >> beam.Create([
          {
              'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'
          },
          {
              'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'
          },
          {
              'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'
          },
          {
              'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'
          },
          {
              'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'
          },
      ])
      | 'Filter perennials' >>
      beam.Filter(FilteringWithMultiArgs,'annual')
      | beam.Map(print))

{'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'}


In [None]:
#Passing as a singleton object
with beam.Pipeline() as p:
  perennial = p | 'Perennial' >> beam.Create(['perennial'])

  perennials = (
      p | 'Gardening plants' >> beam.Create([
          {
              'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'
          },
          {
              'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'
          },
          {
              'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'
          },
          {
              'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'
          },
          {
              'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'
          },
      ])
      | 'Filter perennials' >> beam.Filter(
          lambda plant,duration: plant['duration'] == duration, #func argument
          duration=beam.pvalue.AsSingleton(perennial), #defining the variable argument "duration for the lambda func"
      )
      | beam.Map(print))


{'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'}
{'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'}
{'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'}


In [None]:
# Iterating via specific list of Objects
with beam.Pipeline() as p:
  iter_perennial = p | 'Perennial' >> beam.Create([
      'perennial',
      'annual'
      ])

  perennials = (
      p | 'Gardening plants' >> beam.Create([
          {
              'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'
          },
          {
              'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'
          },
          {
              'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'
          },
          {
              'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'
          },
          {
              'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'
          },
      ])
      | 'Filter perennials' >> beam.Filter(
          lambda plant,duration: plant['duration'] in duration,
          duration=beam.pvalue.AsIter(iter_perennial)
      )
      | beam.Map(print))


{'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'}
{'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'}
{'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'}
{'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'}


In [None]:

with beam.Pipeline() as p:
  keep_duration = p | 'Duration filters' >> beam.Create([
      ('annual', False),
      ('biennial', False),
      ('perennial', True),
  ])

  perennials = (
      p | 'Gardening plants' >> beam.Create([
          {
              'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'
          },
          {
              'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'
          },
          {
              'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'
          },
          {
              'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'
          },
          {
              'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'
          },
      ])
      | 'Filter perennials' >> beam.Filter(
          lambda plant,
          duration: duration[plant['duration']],
          duration=beam.pvalue.AsDict(keep_duration),
      )
      | beam.Map(print))


{'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'}
{'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'}
{'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'}


### Task is to find the word starts with letter 'a' and convert it into the UpperCase

In [None]:
class Finder(beam.DoFn):
    def __init__(self,delimeter = ' '):
        self.delimeter = delimeter
    def process(self,text):
        for word in text.split(self.delimeter):
            yield word

class Output(beam.PTransform):
    class _Transform(beam.DoFn):
        def __init__(self,prefix):
            super().__init__
            self.prefix = prefix
        def process(self,elemnt):
            print(self.prefix + str(elemnt))
    def __init__(self,prefix = '',label = None):
        super().__init__(label)
        self.prefix = prefix
    def expand(self,input):
        input | beam.ParDo(self._Transform(self.prefix))

In [None]:
with beam.Pipeline() as p:
    p \
    |beam.Create(["To be, or not to be: that is the question:Whether 'tis nobler in the mind to suffer The slings and arrows of outrageous fortune,Or to take arms against a sea of troubles,And by opposing end them. To die: to sleep"])\
    | beam.ParDo(Finder())\
    | beam.Filter(lambda word: word.startswith("a"))\
    | Output(prefix='PCollection filtered value: ')


PCollection filtered value: and
PCollection filtered value: arrows
PCollection filtered value: arms
PCollection filtered value: against
PCollection filtered value: a


### Aggregating

#### Count

In [None]:
with beam.Pipeline() as p:
  total_elements = (
      p | 'Create plants' >> beam.Create(['🍓', '🥕', '🥕', '🥕', '🍆', '🍆', '🍅', '🍅', '🍅', '🌽'])
      | 'Count all elements' >> beam.combiners.Count.Globally()
      | beam.Map(print))

10


In [None]:
with beam.Pipeline() as p:
  total_elements = (
      p | 'Create plants' >> beam.Create(['🍓', '🥕', '🥕', '🥕', '🍆', '🍆', '🍅', '🍅', '🍅', '🌽'])
      | 'Count all elements' >> beam.combiners.Count.PerElement()
      | beam.Map(print))

('🍓', 1)
('🥕', 3)
('🍆', 2)
('🍅', 3)
('🌽', 1)


In [None]:
with beam.Pipeline() as p:
  total_elements_per_keys = (
      p | 'Create plants' >> beam.Create([
          ('spring', '🍓'),
          ('spring', '🥕'),
          ('summer', '🥕'),
          ('fall', '🥕'),
          ('spring', '🍆'),
          ('winter', '🍆'),
          ('spring', '🍅'),
          ('summer', '🍅'),
          ('fall', '🍅'),
          ('summer', '🌽'),])
      | 'Count elements per key' >> beam.combiners.Count.PerKey()
      | beam.Map(print))

('spring', 4)
('summer', 3)
('fall', 2)
('winter', 1)


In [None]:
with beam.Pipeline() as p:
    p | beam.Create([(1, 36), (2, 91), (3, 33), (3, 11), (4, 67),])\
  | beam.combiners.Count.PerKey() | beam.Map(print)

(1, 1)
(2, 1)
(3, 2)
(4, 1)


### Task is to count the number of words from any paragraphs

In [None]:
#we are going to use the Finder function created above
# Word Count in Beam
with beam.Pipeline() as p:
    p \
    |beam.Create(["To be, or not to be: that is the question:Whether 'tis nobler in the mind to suffer The slings and arrows of outrageous fortune,Or to take arms against a sea of troubles,And by opposing end them. To die: to sleep"])\
    | beam.ParDo(Finder())\
    | beam.combiners.Count.PerElement()\
    | beam.Map(print)

('To', 2)
('be,', 1)
('or', 1)
('not', 1)
('to', 4)
('be:', 1)
('that', 1)
('is', 1)
('the', 2)
('question:Whether', 1)
("'tis", 1)
('nobler', 1)
('in', 1)
('mind', 1)
('suffer', 1)
('The', 1)
('slings', 1)
('and', 1)
('arrows', 1)
('of', 2)
('outrageous', 1)
('fortune,Or', 1)
('take', 1)
('arms', 1)
('against', 1)
('a', 1)
('sea', 1)
('troubles,And', 1)
('by', 1)
('opposing', 1)
('end', 1)
('them.', 1)
('die:', 1)
('sleep', 1)


#### Sum

In [None]:
with beam.Pipeline() as p:
  total_elements = (
      p | 'Create plants' >> beam.Create([1,2,3,4,5])
      | 'Count all elements' >> beam.CombineGlobally(sum)
      | beam.Map(print))

15


In [None]:
with beam.Pipeline() as p:
  totals_per_key = (
      p | 'Create produce' >> beam.Create([
          ('🥕', 3),
          ('🥕', 2),
          ('🍆', 1),
          ('🍅', 4),
          ('🍅', 5),
          ('🍅', 3),])
      | 'Sum values per key' >> beam.CombinePerKey(sum)
      | beam.Map(print))

('🥕', 5)
('🍆', 1)
('🍅', 12)


#### Task

In [None]:
with beam.Pipeline() as p:
    p | beam.Create([(1, 36), (2, 91), (3, 33), (3, 11), (4, 67),])\
  | beam.CombinePerKey(sum) | beam.Map(print)

(1, 36)
(2, 91)
(3, 44)
(4, 67)


#### Mean

In [None]:
with beam.Pipeline() as p:
  total_elements = (
      p | 'Create plants' >> beam.Create([1,2,3,4,5])
      | beam.combiners.Mean.Globally()
      | beam.Map(print))

3.0


In [None]:
with beam.Pipeline() as p:
    p | beam.Create([(1, 36), (2, 91), (3, 33), (3, 11), (4, 67),])\
  | beam.combiners.Mean.PerKey() | beam.Map(print)

(1, 36.0)
(2, 91.0)
(3, 22.0)
(4, 67.0)


#### MIN

In [None]:
with beam.Pipeline() as p:
  min_element = (
      p | 'Create numbers' >> beam.Create(range(1,12))
      | 'Get min value' >> beam.CombineGlobally(lambda elements: min(elements or [-1]))
      | beam.Map(print))

1


In [None]:
with beam.Pipeline() as p:
  min_element = (
      p | 'Create numbers' >> beam.Create(range(1,12))
      | 'Get min value' >> beam.combiners.Top.Smallest(5) #Usage of Top Func
      | beam.Map(print))

[1, 2, 3, 4, 5]


In [None]:
with beam.Pipeline() as p:
    p | beam.Create([(1, 36), (2, 91), (3, 33), (3, 11), (4, 67),])\
  | beam.CombinePerKey(min) | beam.Map(print)

(1, 36)
(2, 91)
(3, 11)
(4, 67)


### MAX

In [None]:
with beam.Pipeline() as p:
  min_element = (
      p | 'Create numbers' >> beam.Create(range(1,12))
      | 'Get min value' >> beam.CombineGlobally(lambda elements: max(elements or None))
      | beam.Map(print))

11


In [None]:
with beam.Pipeline() as p:
  min_element = (
      p | 'Create numbers' >> beam.Create(range(1,12))
      | 'Get min value' >> beam.combiners.Top.Largest(5) #Usage of Top Func
      | beam.Map(print))

[11, 10, 9, 8, 7]


### WithKeys

In [None]:
with beam.Pipeline() as p:
    p \
    | beam.Create(['apple', 'banana', 'cherry', 'durian', 'guava', 'melon']) \
    | beam.WithKeys(lambda word: word[0:1]) \
    | beam.Map(print)

### *** Challenge**

***Common Transforms motivating challenge***

use 'gs://apache-beam-samples/nyc_taxi/misc/sample1000.csv'


You are provided with a PCollection created from the array of taxi order prices in a csv file. Your task is to find how many orders are below '**15 dollars**' and how many are equal to or above '**$15**'. Return it as a map structure (key-value), make above or below the key, and the total dollar value (sum) of orders - the value. Although there are many ways to do this, try using another transformation presented in this module.

In [None]:
class MinCount(beam.PTransform):
    def expand(self,input):
        return input | beam.Filter(lambda cost:cost<15) |beam.combiners.Count.Globally()


class MaxCount(beam.PTransform):
    def expand(self,input):
        return input | beam.Filter(lambda cost:cost>=15) |beam.combiners.Count.Globally()

class ExtractTaxiRideCostFn(beam.DoFn):

    def process(self, element):
        line = element.split(',')
        return tryParseTaxiRideCost(line,16)


def tryParseTaxiRideCost(line,index):
    if(len(line) > index):
      try:
        yield float(line[index])
      except:
        yield float(0)
    else:
        yield float(0)

In [None]:
with beam.Pipeline() as p:
    input  = (p | 'Log lines' >> beam.io.ReadFromText('sample1000.csv') \
    | beam.ParDo(ExtractTaxiRideCostFn()))
    (input | beam.combiners.Count.Globally() | beam.WithKeys(lambda cost : "Total Records in Data") | Output(label = "Total"))
    (input | MinCount() | beam.WithKeys(lambda cost : "Less than 15") |Output(label = "Min"))
    (input | MaxCount() | beam.WithKeys(lambda cost : "Greater than 15") | Output(label = "max"))

('Greater than 15', 394)
('Total Records in Data', 1000)
('Less than 15', 606)
