# Install Apache Beam using Pip

In [None]:
%%bash

python --version
pip --version

pip install apache_beam
pip install apache-beam[gcp]

# Create a simple Pipeline From an Array

In [None]:
import apache_beam as beam

with beam.Pipeline() as p:
    (
        p | 'Create' >> beam.Create(['noir', 'bree', 'gigi', 'gretyl'])
          | 'Transform' >> beam.Map(str.title)
          | 'Print' >> beam.Map(print)
    )

# Using .Apply instead of the Pipe Character

In [None]:
import apache_beam as beam

with beam.Pipeline() as p:
    dogs = (
        p | 'Create' >> beam.Create(['noir', 'bree', 'gigi', 'gretyl'])
    )
    uppercase_dogs = dogs.apply(beam.Map(str.title))
    printed_dogs = uppercase_dogs.apply(beam.Map(print))

# Create a File for Testing

No big deal here. Just creating a file to read from. 


In [None]:
%%bash

# First make sure the file doesn't exist
rm dogs.txt

#Write the dog names to a file dogs.txt
for dog in Noir Bree Gigi Gretyl Duchess Rusty
do
  echo $dog >> dogs.txt
done

# This is a great line of code :)
cat dogs.txt

# Use Beam.IO to Read From a File

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText

filename = 'dogs.txt'
with beam.Pipeline() as p:
    (
        p | 'Read' >> ReadFromText(filename)
          | 'Transform' >> beam.Map(str.upper)
          | 'Print' >> beam.Map(print)
    )

# Beam.Map

This example just reads from a file and transforms it using a Python function. 

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText

def makeUppercase(element):
    return element.upper()

filename = 'dogs.txt'
with beam.Pipeline() as p:
    (
        p | 'Read' >> ReadFromText(filename)
          #| 'Transform' >> beam.Map(makeUppercase)
          | 'Transform with Lambda' >> beam.Map(lambda item: item.upper())
          | 'Print' >> beam.Map(print)
    )

# Use Beam.IO Write a File

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText

def makeUppercase(element):
    return element.upper()

filename = 'dogs.txt'
with beam.Pipeline() as p:
    (
        p | 'Read' >> ReadFromText(filename)
          | 'Transform' >> beam.Map(makeUppercase)
          | 'Write' >> WriteToText('uppercase-dogs.out')
    )


# Use ls to see if the file was created and
# cat to view the contents of the file. 
!ls
! cat uppercase-dogs.out-00000-of-00001

#Using ParDo() instead of Map()

Note the use of the yield statement when using Pardo, as opposed to the return statement when using a function called with Map()

In [None]:
class MakeUpperCase(beam.DoFn):
    def process(self, element):
      transformed = element.upper()
      yield transformed
        

filename = 'dogs.txt'
with beam.Pipeline() as p:
    (
        p | 'Read' >> ReadFromText(filename)
          | 'Transform' >> beam.ParDo(MakeUpperCase())
          | 'Write' >> WriteToText('pardo-uppercase-dogs.out')
    )

!ls
!cat pardo-uppercase-dogs.out-00000-of-00001

# Create another Test File

Just creating another test file. This one has Species and Name, so we can experiment with grouping and aggregations. 

In [None]:
%%bash

# This just creates a file so we can process it. 
# First make sure the file doesn't exist
rm pets.txt

#Write the dog names to a file dogs.txt
for pet in dog,noir dog,Bree dog,Gigi dog,Gretyl dog,Duchess dog,Rusty cat,Cleo cat,Sparkles cat,Phelix turtle,Cuff turtle,Link
do
  echo $pet >> pets.txt
done


cat pets.txt

# Parse and Filter

After reading the file, each row is converted into a Tuple with Species and Name. 

Then, we can get rid of the cats. 

In [None]:
class ParsePets(beam.DoFn):
    def process(self,element):
        species, name = element.split(',')
        yield (species, name)

class NoCats(beam.DoFn):
    def process(self,element):
        if element[0] != 'cat':
          yield element


def print_results(element):
  print(element)
  return element

filename = 'pets.txt'
with beam.Pipeline() as p:
    (
        p | 'Read' >> ReadFromText(filename)
          | 'Parse' >> beam.ParDo(ParsePets())
          | 'Filter' >> beam.ParDo(NoCats())
          | 'Make the Name UCase' >> beam.Map(lambda pet : (pet[0], pet[1].upper()))
          | 'Print' >> beam.Map(print_results)
          | 'Write' >> WriteToText('results.out')
    )

! ls

! cat results.out-00000-of-00001

# Map and FlatMap versus Pardo

The example below is functionally equivalent to the previous example. However, it uses Map() and FlatMap() rather then ParDo(). 

Use FlatMap() when the number for items in the PCollection will be less after the transform. Also, in the filtering function, note the use of yield, not return.

In [None]:
def parseThePets(element):
    species, name = element.split(',')
    return (species, name)

def filterOutTheCats(element):
  if element[0] != 'cat':
    species, name = (element[0], element[1])
    yield (species, name)


def print_results(element):
  print(element)
  return element

filename = 'pets.txt'
with beam.Pipeline() as p:
    (
        p | 'Read' >> ReadFromText(filename)
          | 'Parse' >> beam.Map(parseThePets)
          | 'Filter' >> beam.FlatMap(filterOutTheCats)
          | 'Transform' >> beam.Map(lambda pet : (pet[0], pet[1].upper()))
          | 'Write' >> WriteToText('results.out')
    )

! ls
! cat results.out-00000-of-00001

# Group By Key

In [None]:
class ParsePets(beam.DoFn):
    def process(self,element):
        species, name = element.split(',')
        yield (species, name)

class NoCats(beam.DoFn):
    def process(self,element):
        if element[0] != 'cat':
          yield element

def print_results(element):
  print(element)
  return element

filename = 'pets.txt'
with beam.Pipeline() as p:
    (
        p | 'Read' >> ReadFromText(filename)
          | 'Parse' >> beam.ParDo(ParsePets())
          #| 'Filter' >> beam.ParDo(NoCats())
          | 'Make the Name UCase' >> beam.Map(lambda pet : (pet[0], pet[1].upper()))
          # The important line is here:
          | 'Group by Species' >> beam.GroupByKey() 
          #| 'Print' >> beam.Map(print_results)
          | 'Write' >> WriteToText('results.out')
    )


!ls
!cat results.out-00000-of-00001

# Count the Number of Pets by Species

In [None]:
class ParsePets(beam.DoFn):
    def process(self,element):
        species, name = element.split(',')
        yield (species, name)


def print_results(element):
  print(element)
  return element

filename = 'pets.txt'
with beam.Pipeline() as p:
    (
        p | 'Read' >> ReadFromText(filename)
          | 'Parse' >> beam.ParDo(ParsePets())
          | 'PairWIthOne' >> beam.Map(lambda x: (x[0], 1))
          | 'GroupAndSum' >> beam.CombinePerKey(sum)
          | 'Print' >> beam.Map(print_results)
          | 'Write' >> WriteToText('results.out')
    )

! ls
! cat results.out-00000-of-00001