#Composite Transform

-	Transforms can have a nested structure, where a complex transform performs multiple simpler transforms (such as more than one ParDo, Combine, GroupByKey, or even other composite transforms). These transforms are called composite transforms.
-	Nesting multiple transforms inside a single composite transform can make your code more modular and easier to understand

#Creating a composite transform:
-	To create your own composite transform, create a subclass of the PTransform class and override the expand method to specify the actual processing logic.
-	The transforms can include core transforms, composite transforms, or the transforms included in the Beam SDK libraries.
- The following code sample shows how to declare a PTransform that accepts a PCollection of Strings for input, and outputs a PCollection of Integers:

  ```
  class ComputeWordLengths(beam.PTransform):
    def expand(self, pcoll):
      # Transform logic goes here.
      return pcoll | beam.Map(lambda x: len(x))
  ```


-	The expand method is where you add the processing logic for the PTransform. Your override of expand must accept the appropriate type of input PCollection as a parameter, and specify the output PCollection as the return value.
-	You can include as many transforms as you want. These transforms can include core transforms, composite transforms, or the transforms included in the Beam SDK libraries.
-	Your composite transform’s parameters and return value must match the initial input type and final return type for the entire transform, even if the transform’s intermediate data changes type multiple times.



In [2]:
import apache_beam as beam

#Using normal transformation like
  - Map
  - Filter
  - CombinePerKey

For three different operations and it includes more code, memory space and time as well.

In [10]:
def splitRow(element):
  return element.split(',')

def filter_on_count(element):
  name, count = element
  if count > 30:
    return element

def format_output(element):
  name, count = element
  return (name.encode('ascii'),str(count),'Experienced employee')

In [15]:
with beam.Pipeline() as p:
  input = (
      p
      | "Read Data">> beam.io.ReadFromText("/content/sample_data/dept_data.txt")
      | "Split">> beam.Map(splitRow)
  )
  account_persons = (
      input
      | "Get only Accnt Dept Person" >> beam.Filter(lambda x : x[3]=='Accounts')
      | "Pair each accnt person" >> beam.Map(lambda x : (x[1],1))
      | "Combine and Sum" >> beam.CombinePerKey(sum)
      | "count filter accounts" >>beam.Filter(filter_on_count)
      | 'Regular accounts employee' >> beam.Map(format_output)
      | 'Write results for account' >> beam.io.WriteToText('/content/sample_data/Account_People.txt')
  )
  hr_persons = (
      input
      | "Get only HR Dept Person" >> beam.Filter(lambda x : x[3]=='HR')
      | "Pair each hr person" >> beam.Map(lambda x : (x[1],1))
      | "Combine and Sum hr" >> beam.CombinePerKey(sum)
      | "count filter hr" >>beam.Filter(filter_on_count)
      | 'Regular hr employee' >> beam.Map(format_output)
      | 'Write results for hr' >> beam.io.WriteToText('/content/sample_data/HR_People.txt')
  )
  finance_persons = (
      input
      | "Get only Finance Dept Person" >> beam.Filter(lambda x : x[3]=='Finance')
      | "Pair each finance person" >> beam.Map(lambda x : (x[1],1))
      | "Combine and Sum fin" >> beam.CombinePerKey(sum)
      | "count filter finops" >>beam.Filter(filter_on_count)
      | 'Regular finance employee' >> beam.Map(format_output)
      | 'Write results for finance' >> beam.io.WriteToText('/content/sample_data/Finance_People.txt')
      #| beam.Map(print)
  )



/content/sample_data/Finance_People.txt-00000-of-00001


#Composite Transforms

- In above PTransforms we can notice CombinePerKey,Filter and Map are used in all three collections repeatedly.

- Instead of calling them separtely we create a composite transform combining of those three transformations and call them in our pipeline.

In [16]:
class MyTransform(beam.PTransform):

  def expand(self, input_coll):

    a = (
        input_coll
                       | 'Group and sum1' >> beam.CombinePerKey(sum)
                       | 'count filter accounts' >> beam.Filter(filter_on_count)
                       | 'Regular accounts employee' >> beam.Map(format_output)

    )
    return a

In [19]:
with beam.Pipeline() as p:
  input = (
      p
      | "Read Data">> beam.io.ReadFromText("/content/sample_data/dept_data.txt")
      | "Split">> beam.Map(splitRow)
  )
  account_persons = (
      input
      | "Get only Accnt Dept Person" >> beam.Filter(lambda x : x[3]=='Accounts')
      | "Pair each accnt person" >> beam.Map(lambda x : (x[1],1))
      | 'composite accounts' >> MyTransform()
      | 'Write results for account' >> beam.io.WriteToText('/content/sample_data/Account_People.txt')
  )
  hr_persons = (
      input
      | "Get only HR Dept Person" >> beam.Filter(lambda x : x[3]=='HR')
      | "Pair each hr person" >> beam.Map(lambda x : (x[1],1))
      | 'composite hr' >> MyTransform()
      | 'Write results for hr' >> beam.io.WriteToText('/content/sample_data/HR_People.txt')
  )
  finance_persons = (
      input
      | "Get only Finance Dept Person" >> beam.Filter(lambda x : x[3]=='Finance')
      | "Pair each finance person" >> beam.Map(lambda x : (x[1],1))
      | 'composite finance' >> MyTransform()
      | 'Write results for finance' >> beam.io.WriteToText('/content/sample_data/Finance_People.txt')
      #| beam.Map(print)
  )



In [20]:
!{('head -n 10 /content/sample_data/Account_People.txt-00000-of-00001')}

(b'Marco', '31', 'Experienced employee')
(b'Rebekah', '31', 'Experienced employee')
(b'Itoe', '31', 'Experienced employee')
(b'Edouard', '31', 'Experienced employee')
(b'Kyle', '62', 'Experienced employee')
(b'Kumiko', '31', 'Experienced employee')
(b'Gaston', '31', 'Experienced employee')


In [21]:
!{('head -n 10 /content/sample_data/Finance_People.txt-00000-of-00001')}

(b'Kumiko', '31', 'Experienced employee')
(b'Wendy', '31', 'Experienced employee')
(b'Cristobal', '31', 'Experienced employee')
(b'Erika', '31', 'Experienced employee')
(b'Sebastien', '31', 'Experienced employee')
(b'Valerie', '31', 'Experienced employee')
(b'Dolly', '31', 'Experienced employee')
(b'Emily', '31', 'Experienced employee')
(b'Kaori', '31', 'Experienced employee')
(b'Hitomi', '31', 'Experienced employee')


In [22]:
!{('head -n 10 /content/sample_data/HR_People.txt-00000-of-00001')}

(b'Beryl', '62', 'Experienced employee')
(b'Olga', '31', 'Experienced employee')
(b'Leslie', '31', 'Experienced employee')
(b'Mindy', '31', 'Experienced employee')
(b'Vicky', '31', 'Experienced employee')
(b'Richard', '31', 'Experienced employee')
(b'Kirk', '31', 'Experienced employee')
(b'Kaori', '31', 'Experienced employee')
(b'Oscar', '31', 'Experienced employee')


#Side Input

-	A side input is an additional input that your DoFn can access each time it processes an element in the input PCollection.
-	In addition to the main input PCollection, you can provide additional inputs to a ParDo transform in the form of side inputs.
-	Side inputs are useful if your ParDo needs to inject additional data when processing each element in the input PCollection, but the additional data needs to be determined at runtime (and not hard-coded).
-	Side inputs must be small in size and not as big as pcollection because it has to be kept in memory of each worker
-	Such values might be determined by the input data, or depend on a different branch of your pipeline.


In [23]:
#firstly read and get list of students we want to exclude
input_list=list()
with open('/content/sample_data/students_exclude.txt','r') as f:
  for line in f:
    input_list.append(line.rstrip())
print(input_list)

['1', '3', '7', '9']


In [26]:
#now create pipeline and read student_age file
class SplitRow(beam.DoFn):
  def process(self,element,input_list):
    customer = element.split(',')
    if customer[0] not in input_list:
      return [customer]


with beam.Pipeline() as p:
  input = (
      p
      | "Read Data">> beam.io.ReadFromText("/content/sample_data/Students_age.txt")
      | "Split">> beam.ParDo(SplitRow(),input_list)
      | "Write results" >> beam.io.WriteToText('/content/sample_data/student_age_output.txt')
  )

In [27]:
!{('head -n 10 /content/sample_data/student_age_output.txt-00000-of-00001')}

['2', 'farooqui', 'hyd', '26']
['4', 'neethu', 'mla', '27', '']
['5', 'joey', 'ny', '57']
['6', 'ross', 'la', '60']
['8', 'lois', 'us', '50']
['10', 'sai', 'chn', '29']


#Side Outputs/ Additional Outputs
  
- While ParDo always produces a main output PCollection (as the return value from apply), you can also have your ParDo produce any number of additional output PCollections.
- If you choose to have multiple outputs, your ParDo returns all of the output PCollections (including the main output) bundled together.


In [29]:
#Create a side list by reading students_Exclude
side_list = list()
with open ('/content/sample_data/students_exclude.txt','r') as exclude_file:
  for cust_id in exclude_file:
    side_list.append(cust_id.rstrip())
print(side_list)

['1', '3', '7', '9']


In [30]:
class SplitRow(beam.DoFn):
  def process(self,element,side_list):
    customer = element.split(',')
    if customer[0] not in side_list:
      return [customer]

class ProcessCustomers(beam.DoFn):
  def process(self,element,country,start_char):
    if(element[2]==country):
      yield  element
    else:
      yield  beam.pvalue.TaggedOutput('Other_student',element)
    if(element[1].startswith('r')):
       yield  beam.pvalue.TaggedOutput('Names_r',element)

In [33]:
with beam.Pipeline() as p:
  input = (
      p
      | "Read Data">> beam.io.ReadFromText("/content/sample_data/Students_age.txt")
      | "Side input" >> beam.ParDo(SplitRow(),side_list)
      | "Side Output" >> beam.ParDo(ProcessCustomers(),'chn','r').with_outputs('Names_r','Other_student',main='Chennai_Cust')
  )

chennai_customers = p.Chennai_Cust
other_cities_customers = p.Other_student
customer_withname_r = p.Names_r

chennai_customers | 'Write Chennai Students PCollection' >> beam.io.WriteToText("chennai")
other_cities_customers  | 'Write Students PCollection that lives in other cities' >> beam.io.WriteToText("students_other_cities")
customer_withname_r  | 'Write Students names with r PCollection' >> beam.io.WriteToText("customers_names_r")

AttributeError: 'Pipeline' object has no attribute 'Chennai_Cust'