<a href="https://colab.research.google.com/github/BusangM/data_science_journey/blob/main/wtc_week5_de_MEMO.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Data Eng: Week 5 Practical Memo


## Setup

First, you need to set up your environment, which includes installing `apache-beam` and downloading a text file from Cloud Storage to your local file system. We are using this file to test your pipeline.

In [None]:
# Run and print a shell command.
def run(cmd):
  print('>> {}'.format(cmd))
  !{cmd}
  print('')

run('pip install --upgrade pip')

# Install apache-beam.
run('pip install --quiet apache-beam')

# Copy the input file into the local file system.
run('wget https://storage.googleapis.com/bdt-beam-store/orders.csv -O orders.csv')
run('wget https://storage.googleapis.com/bdt-beam-store/users.csv -O users.csv')


In [None]:
import apache_beam as beam
import re

outputs_prefix = 'outputs/part'

## Task 1: Basic transformations

### Task 1.1
> Read the input file (users.csv) into an initial PCollection

### Task 1.2
> Perform a transform to split each row from the input file into separate elements (e.g. user, gender etc.) so that you may process them

In [None]:
user_header = ['user', 'gender', 'age', 'address', 'date']


class Transform(beam.DoFn):

  # Use classes to perform transformations on your PCollections
  # Yield or return the element(s) needed as input for the next transform
  def process(self, element):
    yield dict(zip(user_header, element.split(',')))


with beam.Pipeline() as pipeline:
  (pipeline
      | 'Read lines' >> beam.io.ReadFromText('users.csv')
      | 'format line' >> beam.ParDo(Transform())
      | "print" >> beam.Map(print)
  )


#### Variations on function

Possible variations include the options below:

In [None]:
def process_one(x):  # note, this is not a class, so we omit the `self` reference
  """
  :return: a dictionary of key-value pairs
  """
  x = x.split(',')
  return {'user': x[0], 'gender': x[1], 'age': x[2], 'address': x[3], 'date': x[4]}


def process_two(x):
  """
  :return: a list of values (it is ordered so you can just keep track of the indices
  through your code  and you are good)
  """
  return x.split(',')


print(process_one("Amy Sullivan,Female,20, Westlake-OH-44145,2020/08/31"))
print(process_two("Amy Sullivan,Female,20, Westlake-OH-44145,2020/08/31"))


### Task 1.3

>  Perform a transform to change the date format as required

#### A Correct Answer

We simply modify the function in the transform class (namely `process`), or we may add another function.

It is pleasant to isolate functionality and responsibility to functions, so we will take that route.

We are also going to use the python `datetime` class as it is best practice (it will fail if a date string is invalid thus alerting you to issues in your data). However, you could have used simple string replacement; this would just be a little less robust/easy to debug in the event of failure.

Using the datetime approach:
```
datetime.strptime('2020-02-01', '%Y-%m-%d')
```
produces an object
```
datetime.datetime(2020, 2, 1, 0, 0)
```

Otherwise, you will get a nice error saying your date is in the incorrect format.

We will then convert it back into a string (crazy, yes, it may seem so, but with dirtier data, this could be handy).

```
datetime.strptime('2020-02-01', '%Y-%m-%d').strftime('%Y/%m/%d')
```



In [None]:
from datetime import datetime

user_header = ['User', 'Gender', 'Age', 'Address', 'Date joined']


class Transform(beam.DoFn):

  def process(self, element):
    data = dict(zip(user_header, element.split(',')))
    # if the header is included in the pcollection, simply return it else we get
    # a failure on our date conversion
    if list(data.values()) == user_header:
      yield data
    else:
      data['Date joined'] = self.format_date(data['Date joined'])
      yield data


  def format_date(self, value):
    """
    :param value: receive a value that we assume is a date
    :return: a format validated formatted date string
    """
    return datetime.strptime(value, '%Y/%m/%d').strftime('%Y-%m-%d')



with beam.Pipeline() as pipeline:
  (pipeline
      | 'Read lines' >> beam.io.ReadFromText('users.csv')
      | 'format line' >> beam.ParDo(Transform())
      | "print" >> beam.Map(print)
  )


#### Variations on Function

We could have specified the `format_date` function differently. Interestingly, you would not have noticed when parsing the header of the file since `value.replace('-', '/')` would simply make no modifications to the string.



In [None]:
def format_date(value):
  return value.replace('-', '/')

format_date('2020-02-01')

### Task 1.4

> Perform a transform to change the address format as required



In [None]:
from datetime import datetime

user_header = ['User', 'Gender', 'Age', 'Address', 'Date joined']


class Transform(beam.DoFn):

  def process(self, element):
    data = dict(zip(user_header, element.split(',')))
    # if the header is included in the pcollection, simply return it else we get
    # a failure on our date conversion
    if list(data.values()) == user_header:
      yield data
    else:
      data['Date joined'] = self.format_date(data['Date joined'])
      data['Address'] = self.format_address(data['Address'])
      yield data


  def format_date(self, value):
    """
    :param value: receive a value that we assume is a date
    :return: a format validated formatted date string
    """
    return datetime.strptime(value, '%Y/%m/%d').strftime('%Y-%m-%d')

  def format_address(self, value):
    """
    perform necessary operations on address to get it into the correct format
    """
    # noticed that there is a space before the address, so cleaning that
    # up too
    return value.replace('-', ',').strip(' ')



with beam.Pipeline() as pipeline:
  (pipeline
      | 'Read lines' >> beam.io.ReadFromText('users.csv')
      | 'format line' >> beam.ParDo(Transform())
      | "print" >> beam.Map(print)
  )


## Final Answer

### Long answer

In [None]:
from datetime import datetime

user_header = ['User', 'Gender', 'Age', 'Address', 'Date joined']


class Transform(beam.DoFn):

  def process(self, element):
    data = dict(zip(user_header, element.split(',')))
    # if the header is included in the pcollection, simply return it else we get
    # a failure on our date conversion
    if list(data.values()) == user_header:
      yield data
    else:
      data['Date joined'] = self.format_date(data['Date joined'])
      data['Address'] = self.format_address(data['Address'])
      yield data


  def format_date(self, value):
    """
    :param value: receive a value that we assume is a date
    :return: a format validated formatted date string
    """
    return datetime.strptime(value, '%Y/%m/%d').strftime('%Y-%m-%d')

  def format_address(self, value):
    """
    perform necessary operations on address to get it into the correct format
    """
    # noticed that there is a space before the address, so cleaning that
    # up too
    return value.replace('-', ',').strip(' ')


class ToCsv(beam.DoFn):

    def process(self, element):
      yield ','.join(dict(element).values())


with beam.Pipeline() as pipeline:
  (pipeline
      | 'Read lines' >> beam.io.ReadFromText('users.csv')
      | 'format line' >> beam.ParDo(Transform())
      | 'to_csv' >> beam.ParDo(ToCsv())
      | "print" >> beam.Map(print) #beam.io.WriteToText(outputs_prefix)
  )

### Short answer

In [None]:

class Transform(beam.DoFn):

  def process(self, element):
    row_list =  element.split(',')
    row_list[4] = row_list[4].replace('/', '-')
    row_list[3] = row_list[3].replace('-', ',')
    yield ','.join(row_list)

with beam.Pipeline() as pipeline:
  (pipeline
      | 'Read lines' >> beam.io.ReadFromText('users.csv')
      | 'format line' >> beam.ParDo(Transform())
      | "print" >> beam.Map(print) #beam.io.WriteToText(outputs_prefix)
  )

## Task 2: Aggregations

### Task 2.1
> Perform a transform to determine the % split between female and male customers

Here we follow a map-reduce style workload. We emit tuples of (M/F, 1), where (Male, 1) is emitted if male and (Female, 1) is emitted if female.

Test you code with
```
with beam.Pipeline() as pipeline:
  (pipeline
      | 'Read lines' >> beam.io.ReadFromText('users.csv')
      | 'format line' >> beam.ParDo(Transform())
      | 'add_key' >> beam.Map(lambda elem: (elem[1], 1))  # emit (M/F, 1) pairs
      | "print" >> beam.Map(print)
  )
```

and by removing the `combineByKey` part.

We are defining a new `Transform` function here because we want to get rid of that header.

In [None]:
user_header = ['User', 'Gender', 'Age', 'Address', 'Date joined']

class Transform(beam.DoFn):

  def process(self, element):
    row_list =  element.split(',')
    if len(set(user_header).symmetric_difference(set(row_list))) == 0:
      pass
    else:
      row_list[4] = row_list[4].replace('/', '-')
      row_list[3] = row_list[3].replace('-', ',')
      yield row_list


with beam.Pipeline() as pipeline:
  (pipeline
      | 'Read lines' >> beam.io.ReadFromText('users.csv')
      | 'format line' >> beam.ParDo(Transform())
      | 'add_key' >> beam.Map(lambda elem: (elem[1], 1))  # emit (M/F, 1) pairs
      | 'sum' >> beam.CombinePerKey(sum)
      | "print" >> beam.Map(print)
  )


#### Variations on function
There are many ways to do it, but to do it in a beam-stream like fashion (remember, we are working with bounded data here, but it is worthwhile exploring the tool) is a little trickier.

If we want to keep it properly within Beam, i.e. we don't extract the count and genders from each PCollection, but rather create a directed_acyclic_graph (DAG) that solves our problem, then see the Example 5: Combining with side inputs as singletons functionality.

A singleton is a object oriented programming concept whereby an object is created once, and subsequent calls to that object in the code will access that same object.

Interesting, and you will recall from the conversation about Spark, these workloads are designed for massive parallel computation. This means that PCollections are split over multiple machines and aren't made for collecting values from arbitrarily (see SO)

This following cell would have been sufficient to answer the question. It is not entirely neat as we are not calculating the percentages directly from the pipelines, but that will be done in the next more complete answer.

In [None]:

with beam.Pipeline() as pipeline:

  cleaned_csv = (pipeline
      | 'Read lines' >> beam.io.ReadFromText('users.csv')
      | 'format line' >> beam.ParDo(Transform())
  )

  # let's get that global total
  totals = cleaned_csv | 'Count elements' >> beam.combiners.Count.Globally()

  # now,
  counts = ( cleaned_csv
            | 'add_key' >> beam.Map(lambda elem: (elem[1], 1))  # emit (M/F, 1) pairs
            | 'sum' >> beam.CombinePerKey(sum)
           )

#          --- totals ---
#        /                \
# csv ---                    --- flatten
#        \                /
#          --- counts ---
#
# make our two split pipelines converge again with Flatten, and print the values

  ((counts, totals)
    | 'flatten ' >> beam.Flatten()
    | 'Print result' >> beam.Map(print)
  )


To complete it with a single beam pipeline that gives us the final answer directly, we co-opted the example [Example 8: Combining with a CombineFn](https://beam.apache.org/documentation/transforms/python/aggregation/combinevalues/). We copied and pasted the code exactly and then just had to make sure that the pipeline was as expected.

In their example, they took tuples `('label': [list])`, and computed percentages
```
('spring', ['🥕', '🍅', '🥕', '🍅', '🍆']),
('summer', ['🥕', '🍅', '🌽', '🍅', '🍅']),
('fall', ['🥕', '🥕', '🍅', '🍅']),
('winter', ['🍆', '🍆'])
```



In [None]:

# this custom combiner was taken directly from beam's documentation:
###
# BEGIN: COPY PASTA
###
class AverageFn(beam.CombineFn):
  def create_accumulator(self):
    return {}

  def add_input(self, accumulator, input):
    if input not in accumulator:
      accumulator[input] = 0
    accumulator[input] += 1
    return accumulator

  def merge_accumulators(self, accumulators):
    merged = {}
    for accum in accumulators:
      for item, count in accum.items():
        if item not in merged:
          merged[item] = 0
        merged[item] += count
    return merged

  def extract_output(self, accumulator):
    total = sum(accumulator.values())
    percentages = {item: count / total for item, count in accumulator.items()}
    return percentages
###
# END: COPY PASTA
###

class Transform(beam.DoFn):

  def process(self, element):
    row_list =  element.split(',')
    if len(set(user_header).symmetric_difference(set(row_list))) == 0:
      pass
    else:
      yield ('gender', row_list[1])


with beam.Pipeline() as pipeline:

  csv = (pipeline
      | 'Read lines' >> beam.io.ReadFromText('users.csv')
      | 'format line' >> beam.ParDo(Transform())
      | 'group' >> beam.GroupByKey()
      | 'sum' >> beam.CombineValues(AverageFn())
      | 'print' >> beam.Map(print)
  )



## Task 2.2

> Perform a transform that counts the number of customers that joined on each day

This answer will be the same as simply grouping by dates and counting occurences of the lines (if the customers can't join more than once).

Spoiler alert. They are the same.

In [None]:
# class GetState(beam.DoFn):

#   def process(self, x):
#     yield x[3].split(',')[1]

class Count(beam.DoFn):

  def process(self, element):
    yield (element[0], len(element[1]))

with beam.Pipeline() as pipeline:
  (pipeline
      | 'Read lines' >> beam.io.ReadFromText('users.csv')
      | 'inline-transform' >> beam.Map(lambda x: x.split(','))
      | 'get-day-cust' >> beam.Map(lambda x: (x[4].replace('/', '-'), x[0]))
      | 'drop-header' >> beam.Filter(lambda x: x[0] != 'Date joined')
      | 'groupby' >> beam.GroupByKey()
      | 'count' >> beam.ParDo(Count())
      | "print" >> beam.Map(print)
  )

## Task 2.3

> Perform a transform that counts the number of customers for each unique state


In [None]:
class Count(beam.DoFn):

  def process(self, element):
    yield (element[0], len(element[1]))

with beam.Pipeline() as pipeline:
  (pipeline
      | 'Read lines' >> beam.io.ReadFromText('users.csv')
      | 'inline-transform' >> beam.Map(lambda x: x.split(','))
      | 'drop header' >> beam.Filter(lambda x: 'Date joined' not in x)
      | 'get-state-cust' >> beam.Map(lambda x: (x[3].split('-')[1], x[0]))
      | 'groupby' >> beam.GroupByKey()
      | 'sum' >> beam.ParDo(Count())
      | "print" >> beam.Map(print)
  )