# Interactive Beam

In this notebook, we set up your development environment and work through a simple example using the [DirectRunner](https://beam.apache.org/documentation/runners/direct/). You can explore other runners with the [Beam Capatibility Matrix](https://beam.apache.org/documentation/runners/capability-matrix/).

The expectation is that this notebook will help you explore the tutorial in a more interactive way.

To learn more about Colab, see [Welcome to Colaboratory!](https://colab.sandbox.google.com/notebooks/welcome.ipynb).

# Setup

First, you need to set up your environment, which includes installing `apache-beam` and downloading a text file from Cloud Storage to your local file system. We are using this file to test your pipeline.

In [None]:
# Run and print a shell command.
def run(cmd):
  print('>> {}'.format(cmd))
  !{cmd}
  print('')

run('pip install --upgrade pip')

# Install apache-beam.
run('pip install --quiet apache-beam')

# Copy the input file into the local file system.
run('mkdir -p data')
run('gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/')

>> pip install --upgrade pip

>> pip install --quiet apache-beam

>> mkdir -p data

>> gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/
Copying gs://dataflow-samples/shakespeare/kinglear.txt...
/ [1 files][153.6 KiB/153.6 KiB]                                                
Operation completed over 1 objects/153.6 KiB.                                    



In [None]:
! wc -l data/kinglear.txt


5525 data/kinglear.txt


In [None]:

! head -3 data/kinglear.txt

	KING LEAR




In [None]:
import apache_beam as beam
import re

inputs_pattern = 'data/*'
outputs_prefix = 'outputs/part'

## How to interactively work with Beam

Here is an example of how to work iteratively with beam in order to understand what is happening in your pipeline.

Firstly, reduce the size of the King Lear file to be manageable

In [None]:

! head -10 data/kinglear.txt > data/small.txt
! wc -l data/small.txt

10 data/small.txt


Create a custom print function (the python3 function `print` is supposed to work but we define our own here). Then it is possible to see what you are doing to the file.

But something is wrong... why is it printing twice, see [SO](https://stackoverflow.com/a/52282001/1185293)

In [None]:
def myprint(x):
  print('{}'.format(x))

with beam.Pipeline() as pipeline:
  (pipeline
      | 'Read lines' >> beam.io.ReadFromText('data/small.txt')
      | "print" >> beam.Map(myprint)
  )

result = pipeline.run()
result.wait_until_finish()

	KING LEAR


	DRAMATIS PERSONAE


LEAR	king of Britain  (KING LEAR:)

KING OF FRANCE:

	KING LEAR


	DRAMATIS PERSONAE


LEAR	king of Britain  (KING LEAR:)

KING OF FRANCE:



'DONE'

Now, let's break split each line on spaces and get the words out.

In [None]:

with beam.Pipeline() as pipeline:
  (pipeline
      | 'Read lines' >> beam.io.ReadFromText('data/small.txt')
      | 'get words' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line))
      | "print" >> beam.Map(myprint)
  )

KING
LEAR
DRAMATIS
PERSONAE
LEAR
king
of
Britain
KING
LEAR
KING
OF
FRANCE


Recall that `flatMap`s typically act on something (a function, iterable or variable) and apply a function to that something to produce a list of elements. See [this](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap/) great example of how FlatMap works in Beam, and this answer on [SO](https://stackoverflow.com/a/45682977/1185293) for a simple explanation.

In the case above, we applied an anonymous function (lambda function) to a line. We can define it explicitly if you prefer a more conventional syntax

In [None]:
def my_line_split_func(line):
  return re.findall(r"[a-zA-Z']+", line)

with beam.Pipeline() as pipeline:
  (pipeline
      | 'Read lines' >> beam.io.ReadFromText('data/small.txt')
      | 'get words' >> beam.FlatMap(my_line_split_func)
      | "print" >> beam.Map(myprint)
  )


KING
LEAR
DRAMATIS
PERSONAE
LEAR
king
of
Britain
KING
LEAR
KING
OF
FRANCE


### Tutorial



In [None]:
! echo -e 'r1c1,r1c2,2020/03/05\nr2c1,r2c2,2020/03/23' > data/play.csv


In [None]:

class Transform(beam.DoFn):

  # Use classes to perform transformations on your PCollections
  # Yield or return the element(s) needed as input for the next transform
  def process(self, element):
    yield element


with beam.Pipeline() as pipeline:
  (pipeline
      | 'Read lines' >> beam.io.ReadFromText('data/play.csv')
      | 'format line' >> beam.ParDo(Transform())
      | "print" >> beam.Map(myprint)
  )


result.wait_until_finish()

r1c1,r1c2,2020/03/05
r2c1,r2c2,2020/03/23


'DONE'

In [57]:
run('gsutil cp gs://bdt-beam/users_v.csv data/')

>> gsutil cp gs://bdt-beam/users_v.csv data/
Copying gs://bdt-beam/users_v.csv...
- [1 files][140.3 KiB/140.3 KiB]                                                
Operation completed over 1 objects/140.3 KiB.                                    



TASK 1

In [170]:
import apache_beam as beam
import re
from datetime import datetime

# Define the date format transformation function
def transform_date_format(date_str):
    try:
        return datetime.strptime(date_str, '%Y/%m/%d').strftime('%Y-%m-%d')
    except ValueError:
        return date_str

# Define the address transformation function
def transform_address(address_str):

    return address_str.replace('-', ',')

def process_user_data(row):
    # Split by delimiter
    user_data = row.split(',')

    if len(user_data) == 6:
        user_id, name, gender, age, address, date_joined = user_data

        # Transform the date format
        formatted_date = transform_date_format(date_joined)

        # Transform the address format
        formatted_address = transform_address(address)

        # Combine data back into the required format with semicolon delimiter
        return f'{user_id};{name};{gender};{age};{formatted_address};{formatted_date}'
    return None

def filter_valid_rows(row):
    return row is not None

with beam.Pipeline() as pipeline:
    (
        pipeline
        | 'Read CSV' >> beam.io.ReadFromText('data/users_v.csv', skip_header_lines=1)
        | 'Process Rows' >> beam.Map(process_user_data)
        | 'Filter Invalid Rows' >> beam.Filter(filter_valid_rows)
        | 'Write to CSV' >> beam.io.WriteToText('marketing_format.csv')
    )


In [162]:
import pandas as pd
df = pd.read_csv('marketing_format.csv-00000-of-00001')
print(df.head())

        1;Anthony Wolf;male;73;New Rachelburgh  VA  49583;2019-03-13
0  2;James Armstrong;male;56;North Jillianfort  UT  86454;2020-11-06
1               3;Cody Shaw;male;75;North Anne  SC  53799;2004-05-29
2   4;Sierra Hamilton;female;76;New Angelafurt  ME  46190;2005-08-26
3        5;Chase Davis;male;31;South Bethmouth  WI  18562;2018-04-30
4         6;Sierra Andrews;female;21;Ryanville  MI  69690;2007-05-25


TASK 2

In [163]:
import apache_beam as beam
from datetime import datetime

def transform_date_format(date_str):
    try:
        return datetime.strptime(date_str, '%Y/%m/%d').strftime('%Y-%m-%d')
    except ValueError:
        return date_str

def transform_address(address_str):
    return address_str.replace('-', ',')

def process_user_data(row):
    user_data = row.split(',')

    if len(user_data) == 6:
        user_id, name, gender, age, address, date_joined = user_data

        # Transform the date format
        formatted_date = transform_date_format(date_joined)

        # Transform the address format
        formatted_address = transform_address(address)

        # Extract state from the address for geographical analysis
        state = formatted_address.split(',')[1] if ',' in formatted_address else 'Unknown'

        return (user_id, name, gender, age, formatted_address, formatted_date, state)
    return None

# Function to get the gender count
def get_gender_count(data):
    gender_counts = {}
    for _, _, gender, _, _, _ in data:
        if gender in gender_counts:
            gender_counts[gender] += 1
        else:
            gender_counts[gender] = 1
    return gender_counts

# Function to get customer count per day
def get_customers_per_day(data):
    day_counts = {}
    for _, _, _, _, _, date in data:
        if date in day_counts:
            day_counts[date] += 1
        else:
            day_counts[date] = 1
    return day_counts

# Function to get customer count per state
def get_customers_per_state(data):
    state_counts = {}
    for _, _, _, _, _, _, state in data:
        if state in state_counts:
            state_counts[state] += 1
        else:
            state_counts[state] = 1
    return state_counts

# Pipeline
with beam.Pipeline() as pipeline:
    user_data = (
        pipeline
        | 'Read CSV' >> beam.io.ReadFromText('data/users_v.csv')
        | 'Process Rows' >> beam.Map(process_user_data)
        | 'Filter Invalid Rows' >> beam.Filter(lambda row: row is not None)
    )

    # Gender Composition
    gender_composition = (
        user_data
        | 'Extract Gender' >> beam.Map(lambda row: (row[2]))
        | 'Count Gender' >> beam.combiners.Count.PerElement()
        | 'Format Gender Output' >> beam.Map(lambda x: f'Gender: {x[0]}, Count: {x[1]}')
    )

    # Customers per Day
    customers_per_day = (
        user_data
        | 'Extract Date' >> beam.Map(lambda row: (row[5]))
        | 'Count Customers per Day' >> beam.combiners.Count.PerElement()
        | 'Format Day Output' >> beam.Map(lambda x: f'Date: {x[0]}, Count: {x[1]}')
    )

    # Customers per State
    customers_per_state = (
        user_data
        | 'Extract State' >> beam.Map(lambda row: (row[6]))
        | 'Count Customers per State' >> beam.combiners.Count.PerElement()
        | 'Format State Output' >> beam.Map(lambda x: f'State: {x[0]}, Count: {x[1]}')
    )

    # Write outputs to text files
    gender_composition | 'Write Gender Composition' >> beam.io.WriteToText('gender_composition', file_name_suffix='.txt', shard_name_template='')
    customers_per_day | 'Write Customers Per Day' >> beam.io.WriteToText('customers_per_day', file_name_suffix='.txt', shard_name_template='')
    customers_per_state | 'Write Customers Per State' >> beam.io.WriteToText('customers_per_state', file_name_suffix='.txt', shard_name_template='')


In [169]:
df = pd.read_csv('customers_per_day.txt')
print(df.head())

  Date: date_joined   Count: 1
0  Date: 2019-03-13   Count: 1
1  Date: 2020-11-06   Count: 1
2  Date: 2004-05-29   Count: 2
3  Date: 2005-08-26   Count: 1
4  Date: 2018-04-30   Count: 1


In [167]:
df = pd.read_csv('customers_per_state.txt')
print(df.head())

  State: Unknown    Count: 1
0      State: VA   Count: 44
1      State: UT   Count: 50
2      State: SC   Count: 50
3      State: ME   Count: 43
4      State: WI   Count: 56


In [168]:
df = pd.read_csv('gender_composition.txt')
print(df.head())

   Gender: gender      Count: 1
0    Gender: male   Count: 1207
1  Gender: female   Count: 1150
