In [None]:
# Run and print a shell command.
def run(cmd):
  print('>> {}'.format(cmd))
  !{cmd}
  print('')

run('pip install --upgrade pip')

# Install apache-beam.
run('pip install --quiet apache-beam')

# Copy the input file into the local file system.
run('mkdir -p data')
run('wget https://storage.googleapis.com/bdt-beam/orders_v_2022.csv -O data/orders.csv')
run('wget https://storage.googleapis.com/bdt-beam/users_v.csv -O data/users.csv')


In [None]:
import apache_beam as beam
import re

inputs_pattern = 'data/*'
outputs_prefix = 'outputs/part'

In [None]:
user_header = ['user_id', 'name', 'gender', 'age', 'address', 'date_joined']
order_header = ['order_no', 'user_id', 'product_list', 'date_purchased']

class TransformUsers(beam.DoFn):
  def process(self, element):
    yield dict(zip(user_header, element.split(',')))

class TransformOrders(beam.DoFn):
  def process(self, element):
    yield dict(zip(order_header, element.split(';')))

with beam.Pipeline() as pipeline:
  users = (pipeline 
    | 'ReadUsers' >> beam.io.ReadFromText('data/users.csv', skip_header_lines=1)
    | 'FormatUsers' >> beam.ParDo(TransformUsers())
    # | 'printUsers' >> beam.Map(print) 
  )
  orders = (pipeline 
    | 'ReadOrders' >> beam.io.ReadFromText('data/orders.csv', skip_header_lines=1)
    | 'FormatOrders' >> beam.ParDo(TransformOrders())
    # | 'printOrders' >> beam.Map(print) 
  )
  userOrders = (
      (users, orders)
      | 'join' >> beam.CoGroupByKey()
      | 'print' >> beam.Map(print)
  )


In [None]:
user_header = ['user_id', 'name', 'gender', 'age', 'address', 'date_joined']
order_header = ['order_no', 'user_id', 'product_list', 'date_purchased']

class TransformUsers(beam.DoFn):
  def process(self, element):
    yield dict(zip(user_header, element.split(',')))

class TransformOrders(beam.DoFn):
  def process(self, element):
    yield dict(zip(order_header, element.split(';')))

class AverageFn(beam.DoFn):
    def process(self, element, count):
        yield (element[0], element[1]/count) 

with beam.Pipeline() as pipeline:
  users = (pipeline 
    | 'ReadUsers' >> beam.io.ReadFromText('data/users.csv', skip_header_lines=1)
    | 'FormatUsers' >> beam.ParDo(TransformUsers())
    # | 'printUsers' >> beam.Map(print) 
  )
  orders = (pipeline 
    | 'ReadOrders' >> beam.io.ReadFromText('data/orders.csv', skip_header_lines=1)
    | 'FormatOrders' >> beam.ParDo(TransformOrders())
    # | 'printOrders' >> beam.Map(print) 
  )
  userOrders = (
      (users, orders)
      | 'join' >> beam.CoGroupByKey()
      | 'key' >> beam.Map(lambda elem: (elem[2], 1))
      | 'sum' >> beam.CombinePerKey(sum)
      | 'average' >> beam.ParDo(AverageFn(), beam.pvalue.AsSingleton(totals))
      | 'print' >> beam.Map(print)
  )


In [None]:
user_header = ['user_id', 'name', 'gender', 'age', 'address', 'date_joined']
order_header = ['order_no', 'user_id', 'product_list', 'date_purchased']

class TransformUsers(beam.DoFn):
  def process(self, element):
    yield dict(zip(user_header, element.split(',')))

class TransformOrders(beam.DoFn):
  def process(self, element):
    yield dict(zip(order_header, element.split(';')))

class AverageFn(beam.DoFn):
    def process(self, element, count):
        yield (element[0], element[1]/count) 

class Count(beam.DoFn):
  def process(self, element):
    yield (element[0], len(element[1]))

with beam.Pipeline() as pipeline:
  users = (pipeline 
    | 'ReadUsers' >> beam.io.ReadFromText('data/users.csv', skip_header_lines=1)
    | 'FormatUsers' >> beam.ParDo(TransformUsers())
    # | 'printUsers' >> beam.Map(print) 
  )
  orders = (pipeline 
    | 'ReadOrders' >> beam.io.ReadFromText('data/orders.csv', skip_header_lines=1)
    | 'FormatOrders' >> beam.ParDo(TransformOrders())
    # | 'printOrders' >> beam.Map(print) 
  )
  userOrders = (
      (users, orders)
      | 'join' >> beam.CoGroupByKey()
      | 'key' >> beam.Map(lambda elem: (elem[2], 1))
      | 'sum' >> beam.CombinePerKey(sum)
      | 'average' >> beam.ParDo(AverageFn(), beam.pvalue.AsSingleton(totals))
      | 'print' >> beam.Map(print)
  )
  userOrdersTotal = (
      (users, orders)
      | 'join2' >> beam.CoGroupByKey()
      | 'count' >> beam.ParDo(Count())
      | 'printTotal' >> beam.Map(print)
  )

In [None]:
from apache_beam import window

user_header = ['user_id', 'name', 'gender', 'age', 'address', 'date_joined']
order_header = ['order_no', 'user_id', 'product_list', 'date_purchased']

class TransformUsers(beam.DoFn):
  def process(self, element):
    yield dict(zip(user_header, element.split(',')))

class TransformOrders(beam.DoFn):
  def process(self, element):
    yield dict(zip(order_header, element.split(';')))

class VegetableOrderDoFn(beam.DoFn):
    def process(self, element, count):
        yield set([x for x in element[0] if element[0].count(x) > 1])

with beam.Pipeline() as pipeline:
  users = (pipeline 
    | 'ReadUsers' >> beam.io.ReadFromText('data/users.csv', skip_header_lines=1)
    | 'FormatUsers' >> beam.ParDo(TransformUsers())
    # | 'printUsers' >> beam.Map(print) 
  )
  orders = (pipeline 
    | 'ReadOrders' >> beam.io.ReadFromText('data/orders.csv', skip_header_lines=1)
    | 'FormatOrders' >> beam.ParDo(TransformOrders())
    # | 'printOrders' >> beam.Map(print) 
  )
  userOrders = (
      (users, orders)
      | 'join' >> beam.CoGroupByKey()
  )
  windowUserOrders = (
    userOrders 
    | 'window' >> beam.WindowInto(window.Sessions(24*60*60))
    | 'unique' >> beam.ParDo(VegetableOrderDoFn())
    | 'print' >> beam.Map(print) 
  )