# Interactive Beam

In this notebook, we set up your development environment and work through a simple example using the [DirectRunner](https://beam.apache.org/documentation/runners/direct/). You can explore other runners with the [Beam Capatibility Matrix](https://beam.apache.org/documentation/runners/capability-matrix/).

The expectation is that this notebook will help you explore the tutorial in a more interactive way.

To learn more about Colab, see [Welcome to Colaboratory!](https://colab.sandbox.google.com/notebooks/welcome.ipynb).

# Setup

First, you need to set up your environment, which includes installing `apache-beam` and downloading a text file from Cloud Storage to your local file system. We are using this file to test your pipeline.

In [1]:
# Run and print a shell command.
def run(cmd):
  print('>> {}'.format(cmd))
  !{cmd}
  print('')

run('pip install --upgrade pip')

# Install apache-beam.
run('pip install --quiet apache-beam')

# Copy the input file into the local file system.
run('mkdir -p data')
run('gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/')

>> pip install --upgrade pip
Collecting pip
  Using cached pip-24.2-py3-none-any.whl.metadata (3.6 kB)
Using cached pip-24.2-py3-none-any.whl (1.8 MB)
Installing collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 24.1.2
    Uninstalling pip-24.1.2:
      Successfully uninstalled pip-24.1.2
Successfully installed pip-24.2

>> pip install --quiet apache-beam
  Preparing metadata (setup.py) ... [?25l[?25hdone
  Preparing metadata (setup.py) ... [?25l[?25hdone
  Preparing metadata (setup.py) ... [?25l[?25hdone
  Preparing metadata (setup.py) ... [?25l[?25hdone
  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m15.6/15.6 MB[0m [31m76.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m55.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.0/1.0 MB[0m [31m31.0 MB/s[0m eta [

In [2]:
! wc -l data/kinglear.txt


5525 data/kinglear.txt


In [3]:

! head -3 data/kinglear.txt

	KING LEAR




In [4]:
import apache_beam as beam
import re

inputs_pattern = 'data/*'
outputs_prefix = 'outputs/part'

## How to interactively work with Beam

Here is an example of how to work iteratively with beam in order to understand what is happening in your pipeline.

Firstly, reduce the size of the King Lear file to be manageable

In [5]:

! head -10 data/kinglear.txt > data/small.txt
! wc -l data/small.txt

10 data/small.txt


Create a custom print function (the python3 function `print` is supposed to work but we define our own here). Then it is possible to see what you are doing to the file.

But something is wrong... why is it printing twice, see [SO](https://stackoverflow.com/a/52282001/1185293)

In [6]:
def myprint(x):
  print('{}'.format(x))

with beam.Pipeline() as pipeline:
  (pipeline
      | 'Read lines' >> beam.io.ReadFromText('data/small.txt')
      | "print" >> beam.Map(myprint)
  )

result = pipeline.run()
result.wait_until_finish()



	KING LEAR


	DRAMATIS PERSONAE


LEAR	king of Britain  (KING LEAR:)

KING OF FRANCE:

	KING LEAR


	DRAMATIS PERSONAE


LEAR	king of Britain  (KING LEAR:)

KING OF FRANCE:



'DONE'

Now, let's break split each line on spaces and get the words out.

In [None]:

with beam.Pipeline() as pipeline:
  (pipeline
      | 'Read lines' >> beam.io.ReadFromText('data/small.txt')
      | 'get words' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line))
      | "print" >> beam.Map(myprint)
  )

Recall that `flatMap`s typically act on something (a function, iterable or variable) and apply a function to that something to produce a list of elements. See [this](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap/) great example of how FlatMap works in Beam, and this answer on [SO](https://stackoverflow.com/a/45682977/1185293) for a simple explanation.

In the case above, we applied an anonymous function (lambda function) to a line. We can define it explicitly if you prefer a more conventional syntax

In [7]:
def my_line_split_func(line):
  return re.findall(r"[a-zA-Z']+", line)

with beam.Pipeline() as pipeline:
  (pipeline
      | 'Read lines' >> beam.io.ReadFromText('data/small.txt')
      | 'get words' >> beam.FlatMap(my_line_split_func)
      | "print" >> beam.Map(myprint)
  )


KING
LEAR
DRAMATIS
PERSONAE
LEAR
king
of
Britain
KING
LEAR
KING
OF
FRANCE


### Tutorial



In [8]:
! echo -e 'r1c1,r1c2,2020/03/05\nr2c1,r2c2,2020/03/23' > data/play.csv


In [9]:

class Transform(beam.DoFn):

  # Use classes to perform transformations on your PCollections
  # Yield or return the element(s) needed as input for the next transform
  def process(self, element):
    yield element


with beam.Pipeline() as pipeline:
  (pipeline
      | 'Read lines' >> beam.io.ReadFromText('data/play.csv')
      | 'format line' >> beam.ParDo(Transform())
      | "print" >> beam.Map(myprint)
  )


result.wait_until_finish()

r1c1,r1c2,2020/03/05
r2c1,r2c2,2020/03/23


'DONE'

In [11]:
!pip install apache-beam



In [26]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from datetime import datetime
import os

def format_row(row):
    elements = row.split(';')
    if len(elements) != 6:
        return None  # Skip malformed rows

    user_id, name, gender, age, address, date = elements

    # Format name
    name_parts = name.strip().split()
    formatted_name = f"{name_parts[0]} {' '.join(name_parts[1:])}"

    # Format address
    address_parts = address.strip().split(',')
    if len(address_parts) == 3:
        city, state, zip_code = address_parts
        formatted_address = f"{city.strip()},{state.strip()},{zip_code.strip()}"
    else:
        formatted_address = address.strip()

    # Format date
    try:
        formatted_date = datetime.strptime(date.strip(), '%Y-%m-%d').strftime('%Y-%m-%d')
    except ValueError:
        formatted_date = date.strip()

    return f"{user_id};{formatted_name};{gender};{age};{formatted_address};{formatted_date}"

class PrintFn(beam.DoFn):
    def __init__(self, n):
        self.n = n
        self.counter = 0

    def process(self, element):
        if self.counter < self.n:
            print(element)
            self.counter += 1
        yield element

def run_pipeline():
    pipeline_options = PipelineOptions()
    with beam.Pipeline(options=pipeline_options) as p:
        formatted_data = (p
         | 'Read CSV' >> beam.io.ReadFromText('users_v.csv', skip_header_lines=1)
         | 'Format Rows' >> beam.Map(format_row)
         | 'Filter None' >> beam.Filter(lambda x: x is not None)
        )

        # Print the first 5 formatted rows
        formatted_data | 'Print Sample' >> beam.ParDo(PrintFn(5))

        # Write to file
        formatted_data | 'Write CSV' >> beam.io.WriteToText('marketing_format.csv',
                                              header='user_id;name;gender;age;address;date_joined',
                                              shard_name_template='')

if __name__ == '__main__':
    run_pipeline()

    # After the pipeline runs, print the first few lines of the output file
    if os.path.exists('marketing_format.csv'):
        with open('marketing_format.csv', 'r') as f:
            print("\nContents of marketing_format.csv:")
            for i, line in enumerate(f):
                if i < 6:  # Print header + first 5 lines
                    print(line.strip())
                else:
                    break
    else:
        print("Output file not found. Please check for errors in the pipeline execution.")

    # Print the contents of the input file for verification
    if os.path.exists('users_v.csv'):
        print("\nContents of users_v.csv (first 5 lines):")
        with open('users_v.csv', 'r') as f:
            for i, line in enumerate(f):
                if i < 5:  # Print first 5 lines
                    print(line.strip())
                else:
                    break
    else:
        print("Input file 'users_v.csv' not found. Please check if the file is in the correct location.")

INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:apache_beam.io.filebasedsink:Starting finalize_write threads with num_shards: 1 (skipped: 0), batches: 1, num_threads: 1
INFO:apache_beam.io.filebasedsink:Renamed 1 shards in 0.00 seconds.



Contents of marketing_format.csv:
user_id;name;gender;age;address;date_joined

Contents of users_v.csv (first 5 lines):
user_id,name,gender,age,address,date_joined
1,Anthony Wolf,male,73,New Rachelburgh-VA-49583,2019/03/13
2,James Armstrong,male,56,North Jillianfort-UT-86454,2020/11/06
3,Cody Shaw,male,75,North Anne-SC-53799,2004/05/29
4,Sierra Hamilton,female,76,New Angelafurt-ME-46190,2005/08/26


In [24]:
import csv
from collections import Counter

def count_genders(file_path):
    gender_counts = Counter()

    with open(file_path, 'r') as csvfile:
        csv_reader = csv.reader(csvfile)
        next(csv_reader)  # Skip the header row
        for row in csv_reader:
            if len(row) > 2:  # Ensure the row has at least 3 columns
                gender = row[2].strip().lower()
                gender_counts[gender] += 1

    return gender_counts

if __name__ == '__main__':
    file_path = 'users_v.csv'

    print("First few lines of users_v.csv:")
    with open(file_path, 'r') as f:
        for i, line in enumerate(f):
            if i < 5:  # Print first 5 lines
                print(line.strip())
            else:
                break

    print("\nCounting genders...")
    gender_counts = count_genders(file_path)

    print(f"Number of female customers: {gender_counts['female']}")
    print(f"Number of male customers: {gender_counts['male']}")

First few lines of users_v.csv:
user_id,name,gender,age,address,date_joined
1,Anthony Wolf,male,73,New Rachelburgh-VA-49583,2019/03/13
2,James Armstrong,male,56,North Jillianfort-UT-86454,2020/11/06
3,Cody Shaw,male,75,North Anne-SC-53799,2004/05/29
4,Sierra Hamilton,female,76,New Angelafurt-ME-46190,2005/08/26

Counting genders...
Number of female customers: 1150
Number of male customers: 1207


In [27]:
import csv
from collections import Counter
from datetime import datetime

def count_customers_by_join_date(file_path):
    join_date_counts = Counter()

    with open(file_path, 'r') as csvfile:
        csv_reader = csv.reader(csvfile)
        next(csv_reader)  # Skip the header row
        for row in csv_reader:
            if len(row) >= 6:  # Ensure the row has at least 6 columns
                date_joined = row[5].strip()
                # Convert the date to a consistent format (YYYY-MM-DD)
                try:
                    parsed_date = datetime.strptime(date_joined, "%Y/%m/%d")
                    formatted_date = parsed_date.strftime("%Y-%m-%d")
                    join_date_counts[formatted_date] += 1
                except ValueError:
                    print(f"Warning: Invalid date format in row: {row}")

    return join_date_counts

if __name__ == '__main__':
    file_path = 'users_v.csv'

    print("First few lines of users_v.csv:")
    with open(file_path, 'r') as f:
        for i, line in enumerate(f):
            if i < 5:  # Print first 5 lines
                print(line.strip())
            else:
                break

    print("\nCounting customers by join date...")
    join_date_counts = count_customers_by_join_date(file_path)

    print("\nNumber of customers that joined on each day:")
    for date, count in sorted(join_date_counts.items()):
        print(f"{date}: {count}")

    print(f"\nTotal number of unique join dates: {len(join_date_counts)}")
    print(f"Total number of customers: {sum(join_date_counts.values())}")

First few lines of users_v.csv:
user_id,name,gender,age,address,date_joined
1,Anthony Wolf,male,73,New Rachelburgh-VA-49583,2019/03/13
2,James Armstrong,male,56,North Jillianfort-UT-86454,2020/11/06
3,Cody Shaw,male,75,North Anne-SC-53799,2004/05/29
4,Sierra Hamilton,female,76,New Angelafurt-ME-46190,2005/08/26

Counting customers by join date...

Number of customers that joined on each day:
2000-01-02: 1
2000-01-08: 1
2000-01-13: 1
2000-01-16: 1
2000-01-17: 1
2000-01-19: 1
2000-01-21: 1
2000-01-24: 2
2000-01-28: 1
2000-01-31: 1
2000-02-01: 1
2000-02-04: 1
2000-02-06: 1
2000-02-07: 2
2000-02-13: 1
2000-02-18: 1
2000-02-22: 1
2000-03-02: 1
2000-03-05: 1
2000-03-06: 1
2000-03-14: 1
2000-03-23: 2
2000-03-24: 2
2000-04-04: 2
2000-04-07: 1
2000-04-08: 1
2000-04-09: 1
2000-04-10: 1
2000-04-12: 1
2000-04-13: 2
2000-04-19: 1
2000-04-24: 1
2000-04-25: 1
2000-04-26: 1
2000-05-03: 2
2000-05-04: 1
2000-05-05: 1
2000-05-07: 1
2000-05-08: 1
2000-05-11: 1
2000-05-14: 1
2000-05-16: 1
2000-05-28: 1
200

In [29]:
import csv
from collections import Counter

def extract_state(address):
    # Assuming the state is always a 2-letter code between two hyphens
    parts = address.split('-')
    if len(parts) >= 3:
        return parts[-2]  # The state should be the second-to-last part
    return None

def count_customers_by_state(file_path):
    state_counts = Counter()

    with open(file_path, 'r') as csvfile:
        csv_reader = csv.reader(csvfile)
        next(csv_reader)  # Skip the header row
        for row in csv_reader:
            if len(row) >= 5:  # Ensure the row has at least 5 columns
                address = row[4].strip()
                state = extract_state(address)
                if state:
                    state_counts[state] += 1
                else:
                    print(f"Warning: Could not extract state from address: {address}")

    return state_counts

if __name__ == '__main__':
    file_path = 'users_v.csv'

    print("First few lines of users_v.csv:")
    with open(file_path, 'r') as f:
        for i, line in enumerate(f):
            if i < 5:  # Print first 5 lines
                print(line.strip())
            else:
                break

    print("\nCounting customers by state...")
    state_counts = count_customers_by_state(file_path)

    print("\nNumber of customers in each state:")
    for state, count in sorted(state_counts.items()):
        print(f"{state}: {count}")

    print(f"\nTotal number of states: {len(state_counts)}")
    print(f"Total number of customers: {sum(state_counts.values())}")

First few lines of users_v.csv:
user_id,name,gender,age,address,date_joined
1,Anthony Wolf,male,73,New Rachelburgh-VA-49583,2019/03/13
2,James Armstrong,male,56,North Jillianfort-UT-86454,2020/11/06
3,Cody Shaw,male,75,North Anne-SC-53799,2004/05/29
4,Sierra Hamilton,female,76,New Angelafurt-ME-46190,2005/08/26

Counting customers by state...

Number of customers in each state:
AK: 52
AL: 55
AR: 53
AZ: 50
CA: 49
CO: 48
CT: 63
DC: 50
DE: 48
FL: 43
GA: 53
HI: 51
IA: 53
ID: 51
IL: 40
IN: 45
KS: 49
KY: 43
LA: 31
MA: 45
MD: 41
ME: 43
MI: 56
MN: 41
MO: 42
MS: 36
MT: 49
NC: 54
ND: 46
NE: 43
NH: 39
NJ: 58
NM: 42
NV: 40
NY: 45
OH: 28
OK: 47
OR: 39
PA: 35
RI: 35
SC: 50
SD: 48
TN: 44
TX: 48
UT: 50
VA: 44
VT: 54
WA: 42
WI: 56
WV: 40
WY: 50

Total number of states: 51
Total number of customers: 2357


In [32]:
import csv
from collections import Counter

def process_in_batches(file_path, batch_size=10):
    state_counts = Counter()
    total_processed = 0

    with open(file_path, 'r') as file:
        csv_reader = csv.reader(file)
        next(csv_reader)  # Skip header

        batch = []
        for row in csv_reader:
            batch.append(row)
            if len(batch) == batch_size:
                process_batch(batch, state_counts)
                total_processed += len(batch)
                print(f"Processed {total_processed} records. Current state counts:")
                print_state_counts(state_counts)
                batch = []

        # Process any remaining records
        if batch:
            process_batch(batch, state_counts)
            total_processed += len(batch)

    print("\nFinal state counts:")
    print_state_counts(state_counts)
    print(f"Total records processed: {total_processed}")

def process_batch(batch, state_counts):
    for row in batch:
        if len(row) >= 5:
            address = row[4]
            state = extract_state(address)
            if state:
                state_counts[state] += 1

def extract_state(address):
    parts = address.split('-')
    if len(parts) >= 3:
        return parts[-2]
    return None

def print_state_counts(state_counts):
    for state, count in sorted(state_counts.items()):
        print(f"{state}: {count}")
    print()

if __name__ == '__main__':
    file_path = 'users_v.csv'
    process_in_batches(file_path)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
OK: 28
OR: 22
PA: 23
RI: 19
SC: 25
SD: 29
TN: 25
TX: 30
UT: 29
VA: 27
VT: 33
WA: 27
WI: 34
WV: 25
WY: 32

Processed 1430 records. Current state counts:
AK: 28
AL: 37
AR: 40
AZ: 36
CA: 27
CO: 34
CT: 38
DC: 29
DE: 26
FL: 30
GA: 31
HI: 30
IA: 27
ID: 24
IL: 27
IN: 26
KS: 28
KY: 28
LA: 20
MA: 23
MD: 22
ME: 31
MI: 36
MN: 25
MO: 25
MS: 24
MT: 28
NC: 31
ND: 32
NE: 23
NH: 25
NJ: 37
NM: 27
NV: 27
NY: 21
OH: 16
OK: 28
OR: 22
PA: 23
RI: 19
SC: 26
SD: 29
TN: 26
TX: 30
UT: 29
VA: 27
VT: 33
WA: 27
WI: 35
WV: 25
WY: 32

Processed 1440 records. Current state counts:
AK: 28
AL: 37
AR: 40
AZ: 36
CA: 27
CO: 35
CT: 38
DC: 29
DE: 26
FL: 30
GA: 31
HI: 30
IA: 27
ID: 24
IL: 27
IN: 26
KS: 29
KY: 29
LA: 20
MA: 23
MD: 22
ME: 31
MI: 36
MN: 26
MO: 25
MS: 25
MT: 28
NC: 31
ND: 32
NE: 24
NH: 25
NJ: 37
NM: 28
NV: 27
NY: 22
OH: 16
OK: 28
OR: 22
PA: 23
RI: 19
SC: 26
SD: 29
TN: 26
TX: 30
UT: 30
VA: 27
VT: 33
WA: 28
WI: 35
WV: 25
WY: 32

Processed 1450 record