# Try Apache Beam - Python

In this notebook, we set up your development environment and work through a simple example using the [DirectRunner](https://beam.apache.org/documentation/runners/direct/). You can explore other runners with the [Beam Capatibility Matrix](https://beam.apache.org/documentation/runners/capability-matrix/).

To navigate through different sections, use the table of contents. From **View**  drop-down list, select **Table of contents**.

To run a code cell, you can click the **Run cell** button at the top left of the cell, or by select it and press **`Shift+Enter`**. Try modifying a code cell and re-running it to see what happens.

To learn more about Colab, see [Welcome to Colaboratory!](https://colab.sandbox.google.com/notebooks/welcome.ipynb).

# Setup

First, you need to set up your environment, which includes installing `apache-beam` and downloading a text file from Cloud Storage to your local file system. We are using this file to test your pipeline.

In [3]:
# Run and print a shell command.
def run(cmd):
  print('>> {}'.format(cmd))
  !{cmd}
  print('')

# Install apache-beam.
# run('pip install --quiet apache-beam')

# Copy the input file into the local file system.
run('mkdir -p data')
run('gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/')

>> mkdir -p data

>> gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/


A subdirectory or file -p already exists.
Error occurred while processing: -p.
A subdirectory or file data already exists.
Error occurred while processing: data.





'gsutil' is not recognized as an internal or external command,
operable program or batch file.


# Word count with comments

Below is mostly the same code as above, but with comments explaining every line in more detail.

In [4]:
import apache_beam as beam
import re
from collections import Counter

# Configuration parameters
MIN_WORD_LENGTH = 4  # Filter out words shorter than this
TOP_N = 50  # Show top N most frequent words

input_file = 'data/kinglear.txt'
output_prefix = 'outputs/wordcount'

# Create pipeline
with beam.Pipeline() as pipeline:
    # Read and process text
    word_counts = (
        pipeline
        | 'ReadFile' >> beam.io.ReadFromText(input_file)
        # Convert to lowercase and extract words
        | 'ExtractWords' >> beam.FlatMap(
            lambda line: [word.lower() for word in re.findall(r"[a-zA-Z']+", line)]
        )
        # Filter by length
        | 'FilterShortWords' >> beam.Filter(lambda word: len(word) >= MIN_WORD_LENGTH)
        # Remove common stop words
        | 'RemoveStopWords' >> beam.Filter(
            lambda word: word not in {'the', 'and', 'that', 'this', 'with', 'from', 'have', 'will', 'your'}
        )
        # Count occurrences
        | 'PairWithOne' >> beam.Map(lambda word: (word, 1))
        | 'GroupAndSum' >> beam.CombinePerKey(sum)
        # Sort by frequency
        | 'GetTopWords' >> beam.transforms.combiners.Top.Of(TOP_N, key=lambda x: x[1])
        | 'Flatten' >> beam.FlatMap(lambda x: x)
    )
    
    # Format output
    formatted_results = (
        word_counts
        | 'Format' >> beam.Map(lambda wc: f"{wc[0]:20s} : {wc[1]:5d}")
    )
    
    # Write results
    formatted_results | 'WriteOutput' >> beam.io.WriteToText(
        output_prefix,
        file_name_suffix='.txt'
    )


# Cell 3: Display Results with Analysis
import glob

# Read and display results
output_files = glob.glob(f'{output_prefix}*.txt')
if output_files:
    print("=" * 50)
    print(f"TOP {TOP_N} WORDS (minimum length: {MIN_WORD_LENGTH})")
    print("=" * 50)
    print(f"{'Word':<20} : {'Count':<5}")
    print("-" * 30)
    
    with open(output_files[0], 'r') as f:
        for line in f:
            print(line.strip())
    
    print("=" * 50)


# Cell 4: Additional Analysis - Character Statistics
with beam.Pipeline() as pipeline:
    # Analyze character mentions
    character_counts = (
        pipeline
        | 'ReadAgain' >> beam.io.ReadFromText(input_file)
        | 'ExtractCharacters' >> beam.FlatMap(
            lambda line: re.findall(r'\b(LEAR|GLOUCESTER|EDGAR|EDMUND|KENT|CORDELIA|GONERIL|REGAN|FOOL)\b', line.upper())
        )
        | 'CountCharacters' >> beam.Map(lambda char: (char, 1))
        | 'SumCharacters' >> beam.CombinePerKey(sum)
        | 'SortCharacters' >> beam.Map(lambda x: f"{x[0]:15s} appears {x[1]:3d} times")
    )
    
    # Display character statistics
    result = character_counts | beam.Map(print)

print("\nCharacter mention statistics processed!")


# Cell 5: Word Length Distribution
with beam.Pipeline() as pipeline:
    length_distribution = (
        pipeline
        | 'Read' >> beam.io.ReadFromText(input_file)
        | 'GetWords' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line))
        | 'GetLengths' >> beam.Map(lambda word: (len(word), 1))
        | 'CountByLength' >> beam.CombinePerKey(sum)
        | 'Sort' >> beam.Map(lambda x: (x[0], x[1]))
    )
    
    # Collect and display
    distribution = (
        length_distribution 
        | 'Display' >> beam.Map(lambda x: print(f"Length {x[0]:2d}: {'#' * min(x[1]//10, 50)} ({x[1]} words)"))
    )

print("\nWord length distribution:")





TOP 50 WORDS (minimum length: 4)
Word                 : Count
------------------------------
king                 :   311
lear                 :   253
thou                 :   219
kent                 :   175
gloucester           :   167
thee                 :   139
what                 :   137
edgar                :   136
edmund               :   131
fool                 :   120
regan                :   105
shall                :    99
lord                 :    97
come                 :    88
good                 :    84
goneril              :    83
more                 :    81
when                 :    79
enter                :    79
which                :    78
know                 :    75
cornwall             :    75
albany               :    73
i'll                 :    71
well                 :    68
than                 :    67
take                 :    65
their                :    65
here                 :    65
would                :    64
cordelia             :    64
father  