### Composite Transforms

Transforms can have a nested structure, where a complex transform performs multiple simpler transforms (such as more than one ParDo, Combine, GroupByKey, or even other composite transforms). These transforms are called composite transforms.  
Nesting multiple transforms inside a single composite transform can make your code more modular and easier to understand.


In [2]:
import apache_beam as beam

class MyTransform(beam.PTransform):
    """ Transform with common tasks """
    def expand(self, input_coll):
        a = ( 
            input_coll
            | 'Group and sum' >> beam.CombinePerKey(sum)
            | 'count filter' >> beam.Filter(filter_on_count)
            | 'Regular employee' >> beam.Map(format_output)
        )
        return a

    
def SplitRow(element):
    return element.split(',')


def filter_on_count(element):
    name, count = element
    if count > 30:
        return element

    
def format_output(element):
    name, count = element
    #Â return ', '.join((name.encode('ascii'),str(count),'Regular employee'))
    return (name, str(count), 'Regular employee')


with beam.Pipeline() as pipe:
    input_collection = (
        pipe 
        | "Read file" >> beam.io.ReadFromText('data/dept_data.txt')
        | "Split rows" >> beam.Map(SplitRow)
    )

    accounts_count = (
        input_collection
        | 'Get all Accounts dept persons' >> beam.Filter(lambda record: record[3] == 'Accounts')
        | 'Pair each accounts employee with 1' >> beam.Map(lambda record: ("Accounts, " +record[1], 1))
        | 'composite accounts' >> MyTransform()
        | 'Write results for account' >> beam.io.WriteToText('data/Account')
    )

    hr_count = (
        input_collection
        | 'Get all HR dept persons' >> beam.Filter(lambda record: record[3] == 'HR')
        | 'Pair each hr employee with 1' >> beam.Map(lambda record: ("HR, " +record[1], 1))
        | 'composite HR' >> MyTransform()
        | 'Write results for hr' >> beam.io.WriteToText('data/HR')
    )

# visualize output
!{('head -n 20 data/Account-00000-of-00001')}
!{('head -n 20 data/HR-00000-of-00001')}



('Accounts, Marco', '31', 'Regular employee')
('Accounts, Rebekah', '31', 'Regular employee')
('Accounts, Itoe', '31', 'Regular employee')
('Accounts, Edouard', '31', 'Regular employee')
('Accounts, Kyle', '62', 'Regular employee')
('Accounts, Kumiko', '31', 'Regular employee')
('Accounts, Gaston', '31', 'Regular employee')
('HR, Beryl', '62', 'Regular employee')
('HR, Olga', '31', 'Regular employee')
('HR, Leslie', '31', 'Regular employee')
('HR, Mindy', '31', 'Regular employee')
('HR, Vicky', '31', 'Regular employee')
('HR, Richard', '31', 'Regular employee')
('HR, Kirk', '31', 'Regular employee')
('HR, Kaori', '31', 'Regular employee')
('HR, Oscar', '31', 'Regular employee')
