
# Installation
Run the following command to install apache-beam

Note: To run pipeline on the google colab environemnt, no need to install/configure runners. Each session in the colab is assigned with new virtual environment which forces us to install apache beam every time a new session is created

In [0]:
!{'pip install apache-beam'}

# Upload the required files

All the files required to be consumed must be uploaded by the following command. Later transformations could be applied by reading the data file.

In [0]:
from google.colab import files
uploaded = files.upload()

Saving exclude_ids.txt to exclude_ids.txt


# Side Inputs

In addition to the main input PCollection, you can provide additional inputs to a ParDo transform in the form of side inputs. A side input is an additional input that your DoFn can access each time it processes an element in the input PCollection. When you specify a side input, you create a view of some other data that can be read from within the ParDo transform’s DoFn while processing each element.

Side inputs are useful if your ParDo needs to inject additional data when processing each element in the input PCollection, but the additional data needs to be determined at runtime (and not hard-coded). Such values might be determined by the input data, or depend on a different branch of your pipeline.



In [0]:
import apache_beam as beam

# Open a file contained a list of ids which needs to excluded and store it in a list object
side_list=list()
with open ('exclude_ids.txt','r') as my_file:
  for line in my_file:
    side_list.append(line.rstrip())

p = beam.Pipeline()

# We can pass side inputs to a ParDo transform, which will get passed to its process method.
# The first two arguments for the process method would be self and element.

class FilterUsingLength(beam.DoFn):
  def process(self, element,side_list,lower_bound, upper_bound=float('inf')):

    # Retrive id from a line after spliting it by comma
    id = element.split(',')[0]

    # Retrive name from a line after spliting it by comma
    name = element.split(',')[1]

    # Convert line to a list by spliting using comma
    element_list= element.split(',')

    # Return a list if ids are not in the excluded list and the length of the name is in between 3 and 10
    if (lower_bound <= len(name) <= upper_bound) and id not in side_list:
      return [element_list]

# using pardo to filter names with length between 3 and 10
small_names =( 
                p
                # Read text from the file, each element is a line of the file
                | "Read from text file" >> beam.io.ReadFromText('dept_data.txt')

                # Apply the FilterUsingLength function to the ParDo method to filter the ids
                | "ParDo with side inputs" >> beam.ParDo(FilterUsingLength(),side_list,3,10) 

                # Also check if the department is Accounts
                | beam.Filter(lambda record: record[3] == 'Accounts')

                # Apply a value 1 to each employee in a tuple
                | beam.Map(lambda record: (record[0]+ " " + record[1], 1))

                # Run sum on the values 
                | beam.CombinePerKey(sum)

                # Finally write the results to a file
                # Results: A tuple containing the key as id + " " + name and value as the times of occurences of an employee 
                | 'Write results' >> beam.io.WriteToText('data/output_new_final')
             )

p.run()

!{('head -n 20 data/output_new_final-00000-of-00001')}



('503996WI Edouard', 31)
('957149WC Kyle', 31)
('241316NX Kumiko', 31)
('796656IE Gaston', 31)
('718737IX Ayumi', 30)


# Additional Outputs

While ParDo always produces a main output PCollection (as the return value from apply), you can also have your ParDo produce any number of additional output PCollections. If you choose to have multiple outputs, your ParDo returns all of the output PCollections (including the main output) bundled together.



In [0]:
import apache_beam as beam

# Class to inherit the DoFn class of beam
class ProcessWords(beam.DoFn):
  
  # Override the process method to implement the filter logic to separate outputs
  def process(self, element, cutoff_length, marker):
    
    name = element.split(',')[1]

    # If name starts with the marker
    if name.startswith(marker):
      return [name]

    # If the length of the name is less than the cutoff
    if len(name) <= cutoff_length:
      return [beam.pvalue.TaggedOutput('Short_Names', name)]
    
    # Else if the length is greater than the cutoff
    else:
      return [beam.pvalue.TaggedOutput('Long_Names', name)]
    
   
      
p = beam.Pipeline()

results = (
            p
            # Read from the file, each element has one line of data
            | beam.io.ReadFromText('dept_data.txt')

            # Apply the ParDo transform with the additional outputs
            | beam.ParDo(ProcessWords(), cutoff_length=5, marker='A').with_outputs('Short_Names', 'Long_Names', main='Names_A')

          )

# Create each collection for each separate additional output from the results pipeline
short_collection = results.Short_Names
long_collection = results.Long_Names
startA_collection = results.Names_A

# write to file  
short_collection | 'Write 1'>> beam.io.WriteToText('short')

# write to file
long_collection | 'Write 2'>> beam.io.WriteToText('long')

# write to file
startA_collection | 'Write 3'>> beam.io.WriteToText('start_a')

p.run()
print("Short:")
!{'head -n 5 short-00000-of-00001'}
print("\nLong:")
!{'head -n 5 long-00000-of-00001'}
print("\nStarting with A:")
!{'head -n 5 start_a-00000-of-00001'}

Short:
Marco
Itoe
Kyle
Kyle
Beryl

Long:
Rebekah
Edouard
Kumiko
Gaston
Leslie

Starting with A:
Ayumi
Ayumi
Ayumi
Ayumi
Ayumi


# Remove Duplicates (extension to additional outputs)

An example to remove duplicates from the additional outputs



In [0]:
import apache_beam as beam

class group_and_remove_duplicates(beam.PTransform):

  def expand(self, input_coll):   
    a = ( 
          input_coll
          | 'Assign value 1' >> beam.Map(lambda x:(x,1))
          | 'Group by key' >> beam.GroupByKey()
          | 'Select the first column' >> beam.Map(lambda x: x[0])              
    )
    return a

# Class to inherit the DoFn class of beam
class ProcessWords(beam.DoFn):
  
  # Override the process method to implement the filter logic to separate outputs
  def process(self, element, cutoff_length, marker):
    
    name = element.split(',')[1]

    # If name starts with the marker
    if name.startswith(marker):
      return [name]

    # If the length of the name is less than the cutoff
    if len(name) <= cutoff_length:
      return [beam.pvalue.TaggedOutput('Short_Names', name)]
    
    # Else if the length is greater than the cutoff
    else:
      return [beam.pvalue.TaggedOutput('Long_Names', name)]
    
   
      
p = beam.Pipeline()

results = (
            p
            # Read from the file, each element has one line of data
            | beam.io.ReadFromText('dept_data.txt')

            # Apply the ParDo transform with the additional outputs
            | beam.ParDo(ProcessWords(), cutoff_length=4, marker='A').with_outputs('Short_Names', 'Long_Names', main='Names_A')

          )

# Create each collection for each separate additional output from the results pipeline
short_collection = results.Short_Names |'Remove duplicates short names' >> group_and_remove_duplicates()                  
long_collection = results.Long_Names |'Remove duplicates long names' >> group_and_remove_duplicates()
startA_collection = results.Names_A |'Remove duplicates names with marker' >> group_and_remove_duplicates()

# write to file  
short_collection | 'Write 1'>> beam.io.WriteToText('short')

# write to file
long_collection | 'Write 2'>> beam.io.WriteToText('long')

# write to file
startA_collection | 'Write 3'>> beam.io.WriteToText('start_a')

p.run()
print("Short:")
!{'head -n 10 short-00000-of-00001'}
print("\nLong:")
!{'head -n 10 long-00000-of-00001'}
print("\nStarting with A:")
!{'head -n 10 start_a-00000-of-00001'}

Short:
Marco
Itoe
Kyle
Beryl
Olga
Mindy
Vicky
Kirk
Kaori
Oscar

Long:
Rebekah
Edouard
Kumiko
Gaston
Leslie
Richard
Cristobal
Sebastien
Valerie
Hitomi

Starting with A:
Ayumi
