In [None]:
!pip3 install apache_beam

In [None]:
import apache_beam as beam

## **GroupBy**:

*   Takes a collection of elements and produces a collection grouped, by properties of those elements.
*   Unlike GroupByKey, the key is dynamically created from the elements themselves.





In [None]:
with beam.Pipeline() as p:
  grouped = (
      p
      | beam.Create(['strawberry', 'raspberry', 'blueberry', 'blackberry', 'banana'])
      | beam.GroupBy(lambda s: s[0])
      |beam.Map(print))

## **Aggregation:**

In [None]:
CLOUD_LIST = [
    beam.Row(account_id="01411F", service_id="2062-016F", service_description="Support", cost=0.122116, usage_start_time="2024-08-24 14:00:00.000000 UTC", usage_end_time="2024-08-24 15:00:00.000000 UTC", export_time="2024-08-25 01:03:47.021696 UTC"),
    beam.Row(account_id="010FD9", service_id="2062-016F", service_description="Support", cost=0.002986, usage_start_time="2024-08-21 00:00:00.000000 UTC",usage_end_time="2024-08-21 01:00:00.000000 UTC",export_time="2024-08-21 07:17:49.309164 UTC"),
    beam.Row(account_id="01411F", service_id="4567-001A", service_description="EC2", cost=0.001945, usage_start_time="2024-08-28 17:00:00.000000 UTC",usage_end_time="2024-08-28 18:00:00.000000 UTC",export_time="2024-08-29 03:40:11.062982 UTC"),
    beam.Row(account_id="0123FE", service_id="2062-016F", service_description="Support", cost=0.496863, usage_start_time="2024-08-01 01:00:00.000000 UTC",usage_end_time="2024-08-01 02:00:00.000000 UTC",export_time="2024-08-01 12:02:19.080977 UTC"),
    beam.Row(account_id="010FD9", service_id="4567-001A", service_description="EC2", cost=0.522116, usage_start_time="2024-08-11 17:00:00.000000 UTC",usage_end_time="2024-08-11 18:00:00.000000 UTC",export_time="2024-08-12 02:12:45.602702 UTC"),
]

In [None]:
with beam.Pipeline() as p:
  grouped = p | beam.Create(CLOUD_LIST) | beam.GroupBy('account_id') | beam.Map(print)

In [None]:
with beam.Pipeline() as p:
  grouped = (
      p
      | beam.Create(CLOUD_LIST)
      | beam.GroupBy('service_description')
      | beam.Map(lambda element: (element[0], len(element[1])))
      | beam.Map(print)
      ) #changed to count the number of recipes per fruit.

In [None]:
with beam.Pipeline() as p:
  grouped = (
      p
      | beam.Create(CLOUD_LIST)
      | beam.GroupBy('service_description')
      | beam.Map(lambda element: (element[0], sum(r.cost for r in element[1]))) # Iterate through element[1] (rows) to calculate the sum of cost for each service_description.
      | beam.Map(print)
  )

In [None]:
with beam.Pipeline() as p:
  grouped = (
      p
      | beam.Create(CLOUD_LIST)
      | beam.GroupBy('service_description')
          .aggregate_field('cost', sum, 'total_cost')
      | beam.Map(print))

In [None]:
with beam.Pipeline() as p:
  grouped = (
      p
      | beam.Create(CLOUD_LIST)
      | beam.GroupBy('account_id')
          .aggregate_field('cost', sum, 'total_cost')
      | beam.Map(print))

## **GroupByKey**:

*   Takes a keyed collection of elements and produces a collection where each element consists of a key and all values associated with that key.



In [None]:
records = [("vignesh", [27, "engineer"]),
("neethu", [27, "developer"]),
("farooqui", [26, "data analyst"]),
("sai", [29, "web developer"]),
("tinkle", [28, "fullstack developer"]),
("neethu", 'Employed'),
("sai", 'Unemployed'),
("tinkle", 'Employed'),
("farooqui",'Employed'),
("vignesh", 'Unemployed')]

In [None]:
with beam.Pipeline() as pipeline:
  produce_counts = (
      pipeline
      | 'Create produce counts' >> beam.Create(records)
      | 'Group counts per produce' >> beam.GroupByKey()
      | beam.Map(print))

## **CoGroupByKey**:

*   Aggregates all input elements by their key and allows downstream processing to consume all values associated with the key.
*   While GroupByKey performs this operation over a single input collection and thus a single type of input values.
*  CoGroupByKey operates over multiple input collections. As a result, the result for each key is a tuple of the values associated with that key in each input collection.



In [None]:
with beam.Pipeline() as pipeline:
  student_pairs = pipeline | 'Create icons' >> beam.Create([
      ('vignesh', 'bangalore'),
      ('khaula', 'hyderabad'),
      ('neethu', 'malapur'),
      ('sai', 'chennai'),
  ])

  student_result = pipeline | 'Create durations' >> beam.Create([
      ('vignesh', [15,"FAIL"]),
      ('khaula', [99,"PASS"]),
      ('neethu', [100,"PASS"]),
      ('sai',[ 37,"FAIL"]),
  ])

  plants = (({
      'icons': student_pairs, 'durations': student_result
  })
            | 'Merge' >> beam.CoGroupByKey()
            | beam.Map(print))

In [None]:
with beam.Pipeline() as pipeline:
  student_pairs = pipeline | 'Create icons' >> beam.Create([
      ('vignesh', 15),
      ('khaula', 99),
      ('neethu', 100),
      ('sai', 37),
  ])

  student_result = pipeline | 'Create durations' >> beam.Create([
      ('vignesh', "FAIL"),
      ('khaula',"PASS"),
      ('neethu',"PASS"),
      ('sai', "FAIL"),
  ])

  plants = (({
      'Marks': student_pairs, 'Result': student_result
  })
  | 'Merge' >> beam.CoGroupByKey()
  | beam.Map(print))

## **GroupIntoBatches**:



*   Batches the input into desired batch size.



In [None]:
with beam.Pipeline() as pipeline:
  batches_with_keys = (
      pipeline
      | 'Create produce' >> beam.Create([
          ('spring', '🍓'),
          ('spring', '🥕'),
          ('spring', '🍆'),
          ('spring', '🍅'),
          ('summer', '🥕'),
          ('summer', '🍅'),
          ('summer', '🌽'),
          ('fall', '🥕'),
          ('fall', '🍅'),
          ('winter', '🍆'),
      ])
      | 'Group into batches' >> beam.GroupIntoBatches(4)  #3, #2
      | beam.Map(print))