In [3]:
#pip install apache_beam

In [4]:
import apache_beam as beam

In [10]:

with beam.Pipeline() as pipeline:
  students = (
      pipeline
      | " Read from text" >> beam.io.ReadFromText("/content/sample_data/students.txt", skip_header_lines= True)
      | "Spliting the record" >> beam.Map(lambda x : x.split(','))
      | "filtering the data with FAIL" >> beam.Filter(lambda x : x[5] == "FAIL")
      | "Wite to csv" >> beam.io.WriteToText("result/fail_students"))




In [11]:
ls

[0m[01;34mresult[0m/  [01;34msample_data[0m/


In [15]:
cat /content/result/fail_students-00000-of-00001

['1', 'vignesh', 'chn', '27', '15', 'FAIL']
['2', 'joey', 'us', '51', '20', 'FAIL']
['6', 'sree', 'koc', '25', '27', 'FAIL']
['9', 'tinkle', 'ker', '27', '9', 'FAIL']


**Map**


* Applies a simple 1 to 1 mapping funxtion over each element in the collection.



In [30]:
def strip_header_and_newline(text):
  return text.strip('# \n')

with beam.Pipeline() as pipeline:
    plants = (
        pipeline
        |'Gardening plants' >> beam.Create([
            '# 🍓Strawberry\n',
            '# 🥕Carrot\n',
            '# 🍆Eggplant\n',
            '# 🍅Tomato\n',
            '# 🥔Potato\n',
        ])
        | 'Strip header' >> beam.Map(strip_header_and_newline)
        |beam.Map(print))



🍓Strawberry
🥕Carrot
🍆Eggplant
🍅Tomato
🥔Potato


**MapTuple**


* For key-values pairs, if your Pcollection consists or (key, value) pairs, you can use MapTuple to unpack then into a different function argument.



In [26]:
# MapTuple for key-value pairs
with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          ('🍓', 'Strawberry'),
          ('🥕', 'Carrot'),
          ('🍆', 'Eggplant'),
          ('🍅', 'Tomato'),
          ('🥔', 'Potato'),
      ])
      | 'Format' >> beam.MapTuple(lambda icon, plant: '{}{}'.format(icon, plant))
      | beam.Map(print))

🍓Strawberry
🥕Carrot
🍆Eggplant
🍅Tomato
🥔Potato


**FlatMap**


* Applies a simple 1 to many mapping function over each element in the collection. The many elements are flattened into the resulting collection.



In [31]:
def split_words(text):
  return text.split(',')

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          '🍓Strawberry,🥕Carrot,🍆Eggplant',
          '🍅Tomato,🥔Potato',
      ])
      | 'Split words' >> beam.FlatMap(split_words)
      | beam.Map(print)
  )

🍓Strawberry
🥕Carrot
🍆Eggplant
🍅Tomato
🥔Potato


**FlatMapTuple for key-value pairs**

* If your PCollection consists of (key, value) pairs, you can use FlatMapTuple to unpack them into different function arguments.



In [33]:
def format_plant(icon, plant):
  if icon:
    yield '{}{}'.format(icon, plant)

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          ('🍓', 'Strawberry'),
          ('🥕', 'Carrot'),
          ('🍆', 'Eggplant'),
          ('🍅', 'Tomato'),
          ('🥔', 'Potato'),
          (None, 'Invalid'),
      ])
      | 'Format' >> beam.FlatMapTuple(format_plant)
      | beam.Map(print))



🍓Strawberry
🥕Carrot
🍆Eggplant
🍅Tomato
🥔Potato



**Filter**
* Given a predicate, filter out all elements that don’t
satisfy that predicate. May also be used to filter based on an inequality with a given value based on the comparison ordering of the element.

In [37]:
def is_perennial(plant):
  return plant['duration'] == 'perennial'

with beam.Pipeline() as pipeline:
  perennials = (pipeline
| 'Gardening plants' >> beam.Create([
       {
              'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'
          },
          {
              'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'
          },
          {
              'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'
          },
          {
              'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'
          },
          {
              'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'
          },
      ])
| 'Filter perennials' >> beam.Filter(is_perennial)
| beam.Map(print))


{'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'}
{'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'}
{'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'}


In [40]:

#Filtering with multiple arguments

def has_duration(plant, duration):
  return plant['duration'] == duration

with beam.Pipeline() as pipeline:
  perennials = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          {
              'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'
          },
          {
              'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'
          },
          {
              'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'
          },
          {
              'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'
          },
          {
              'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'
          },
      ])
      | 'Filter perennials' >> beam.Filter(has_duration, 'annual') # u can change 'annual' to perennial to test code
      | beam.Map(print))


{'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'}
