In [None]:
!pip install apache-beam[interactive]

In [1]:
import apache_beam as beam

**Combine Per Key**

In [3]:
p1 = beam.Pipeline()

delayed_time = (
    p1
    | 'Import Data' >> beam.io.ReadFromText('flights_sample.csv')
    | 'Split by ,' >> beam.Map(lambda record: record.split(','))
    | 'Filter Delays' >> beam.Filter(lambda record: int(record[8]) > 0)
    | 'Create Key-Value pair' >> beam.Map(lambda record: (record[4], int(record[8])))
    # | 'Combine Per Key' >> beam.CombinePerKey(sum)
    | 'Print Results' >> beam.Map(print)
)

p1.run()

('LAX', 2)
('LAX', 39)
('HNL', 15)
('DFW', 95)
('OGG', 138)
('LAX', 19)
('JFK', 1)
('JFK', 88)
('LAX', 18)
('JFK', 120)
('LAX', 16)
('JFK', 11)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fd6802ddc60>

Sum of delayed time

In [4]:
p2 = beam.Pipeline()

delayed_time = (
    p2
    | 'Import Data' >> beam.io.ReadFromText('flights_sample.csv')
    | 'Split by ,' >> beam.Map(lambda record: record.split(','))
    | 'Filter Delays' >> beam.Filter(lambda record: int(record[8]) > 0)
    | 'Create Key-Value pair' >> beam.Map(lambda record: (record[4], int(record[8])))
    | 'Combine Per Key' >> beam.CombinePerKey(sum)
    | 'Print Results' >> beam.Map(print)
)

p2.run()

('LAX', 94)
('HNL', 15)
('DFW', 95)
('OGG', 138)
('JFK', 220)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fd68030e410>

Max delayed time

In [5]:
p3 = beam.Pipeline()

max_delayed_time = (
    p3
    | 'Import Data' >> beam.io.ReadFromText('flights_sample.csv')
    | 'Split by ,' >> beam.Map(lambda record: record.split(','))
    | 'Filter Delays' >> beam.Filter(lambda record: int(record[8]) > 0)
    | 'Create Key-Value pair' >> beam.Map(lambda record: (record[4], int(record[8])))
    | 'Combine Per Key' >> beam.CombinePerKey(max)
    | 'Print Results' >> beam.Map(print)
)

p3.run()

('LAX', 39)
('HNL', 15)
('DFW', 95)
('OGG', 138)
('JFK', 120)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fd680e7ff10>

**combiners.Count.PerKey**

In [6]:
p4 = beam.Pipeline()

count_delayed_time = (
    p4
    | 'Import Data' >> beam.io.ReadFromText('flights_sample.csv')
    | 'Split by ,' >> beam.Map(lambda record: record.split(','))
    | 'Filter Delays' >> beam.Filter(lambda record: int(record[8]) > 0)
    | 'Create Key-Value pair' >> beam.Map(lambda record: (record[4], int(record[8])))
    | 'Count Per Key' >> beam.combiners.Count.PerKey()
    | 'Print Results' >> beam.Map(print)
)

p4.run()

('LAX', 5)
('HNL', 1)
('DFW', 1)
('OGG', 1)
('JFK', 4)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fd6802dcd00>

**CoGroupByKey**

In [7]:
p5 = beam.Pipeline()

flight_delay_pc_kv = (
    p5
    | 'Import Data' >> beam.io.ReadFromText('flights_sample.csv')
    | 'Split by ,' >> beam.Map(lambda record: record.split(','))
    | 'Filter Delays' >> beam.Filter(lambda record: int(record[8]) > 0)
    | 'Create Key-Value pair' >> beam.Map(lambda record: (record[4], int(record[8])))
)

total_delayed_time = (
    flight_delay_pc_kv
    | 'Combine Per Key' >> beam.CombinePerKey(sum)
)

count_delayed_time = (
    flight_delay_pc_kv
    | 'Count Per Key' >> beam.combiners.Count.PerKey()
)

delay_table = (
    {'delayed_num': count_delayed_time, 'delayed_time': total_delayed_time}
    | beam.CoGroupByKey()
    | beam.Map(print)
)

p5.run()

('LAX', {'delayed_num': [5], 'delayed_time': [94]})
('HNL', {'delayed_num': [1], 'delayed_time': [15]})
('DFW', {'delayed_num': [1], 'delayed_time': [95]})
('OGG', {'delayed_num': [1], 'delayed_time': [138]})
('JFK', {'delayed_num': [4], 'delayed_time': [220]})


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fd67f2adfc0>

**ParDo** (Parallely Do Something)

In [9]:
p6 = beam.Pipeline()

class Filter(beam.DoFn):
  def process(self, record):
    if int(record[8]) > 0:
      return [record]

flight_delay_pc_kv = (
    p6
    | 'Import Data' >> beam.io.ReadFromText('flights_sample.csv')
    | 'Split by ,' >> beam.Map(lambda record: record.split(','))
    | 'Filter Delays' >> beam.ParDo(Filter())
    | 'Create Key-Value pair' >> beam.Map(lambda record: (record[4], int(record[8])))
)

total_delayed_time = (
    flight_delay_pc_kv
    | 'Combine Per Key' >> beam.CombinePerKey(sum)
)

count_delayed_time = (
    flight_delay_pc_kv
    | 'Count Per Key' >> beam.combiners.Count.PerKey()
)

delay_table = (
    {'delayed_num': count_delayed_time, 'delayed_time': total_delayed_time}
    | beam.CoGroupByKey()
    | beam.Map(print)
)

p6.run()

('LAX', {'delayed_num': [5], 'delayed_time': [94]})
('HNL', {'delayed_num': [1], 'delayed_time': [15]})
('DFW', {'delayed_num': [1], 'delayed_time': [95]})
('OGG', {'delayed_num': [1], 'delayed_time': [138]})
('JFK', {'delayed_num': [4], 'delayed_time': [220]})


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fd68030df90>