In [1]:
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.runners.interactive.interactive_beam import *
from apache_beam import coders
from apache_beam import combiners
import csv
import typing
from apache_beam.utils.timestamp import Duration


In [2]:
pipeline = beam.Pipeline(InteractiveRunner())


In [3]:
def read_csv_file(filename):
  with open(filename, 'r') as file:
    # Optionally, skip the header if there is one
    # next(file, None)  # Uncomment to skip the first row (header)
    for row in csv.reader(file):
      yield row

def parse_url_string(line):
  kv= {key_value.split('=')[0]: key_value.split('=')[1] for key_value in line[0].split('&')}
  kv['created'] = int(kv['created'])
  return beam.Row(**kv)

api_pcoll = (pipeline
| 'Emit filename' >> beam.Create(['api.log'])
| 'Read CSV' >> beam.FlatMap(read_csv_file)
| 'Parse line' >> beam.Map(parse_url_string)
| 'Add timestamp' >> beam.Map(
  lambda x: beam.window.TimestampedValue(x, x.created))
)


In [4]:
windowing = beam.WindowInto(
  beam.window.FixedWindows(60)
)


## Sum of events per window

In [5]:
result = (
  api_pcoll
  | 'Windowing' >> windowing
  | 'Sum' >> beam.combiners.Count.Globally().without_defaults()
  | 'Format' >> beam.Map(lambda x: 'Count: {}'.format(x)))

collect(result)


Unnamed: 0,0
0,Count: 10000
1,Count: 10000
2,Count: 10000


## Sum of events per combined keys in window

In [6]:
def combine_key(kv):
  return f'hostname={kv.hostname} uid={kv.uid}'

class FormatRecordDoFn(beam.DoFn):
  def __init__(self):
      super(FormatRecordDoFn, self).__init__()

  def process(self, element,  window=beam.DoFn.WindowParam):
      window_start = window.start
      window_end = window.end
      return [(window_start, window_end, element[0], element[1])]

result = (
  api_pcoll
  | 'Combine key' >> beam.Map(lambda x: (combine_key(x), x))
  | 'Windowing' >> windowing
  | 'Sum' >> beam.combiners.Count.PerKey()
  | 'Format' >> beam.ParDo(FormatRecordDoFn())
)

collect(result)


Unnamed: 0,0,1,2,3
0,Timestamp(0),Timestamp(60),hostname=hostname3 uid=uid45,15
1,Timestamp(60),Timestamp(120),hostname=hostname3 uid=uid45,10
2,Timestamp(120),Timestamp(180),hostname=hostname3 uid=uid45,12
3,Timestamp(0),Timestamp(60),hostname=hostname7 uid=uid81,14
4,Timestamp(60),Timestamp(120),hostname=hostname7 uid=uid81,10
...,...,...,...,...
2668,Timestamp(60),Timestamp(120),hostname=hostname2 uid=uid59,7
2669,Timestamp(120),Timestamp(180),hostname=hostname2 uid=uid59,11
2670,Timestamp(0),Timestamp(60),hostname=hostname2 uid=uid37,5
2671,Timestamp(60),Timestamp(120),hostname=hostname2 uid=uid37,13
