# Try Apache Beam - Python

In this notebook, we set up the environment and walk through a simple example using Apache Beam’s DirectRunner. You will see how to prepare input data, construct a basic pipeline, and process a text file locally. Each section is designed to help you understand the fundamental steps involved in creating and running a Beam pipeline, from reading input to producing and examining output results. 

To run a code cell, you can click the **Run cell** button at the top left of the cell, or by select it and press **`Shift+Enter`**. Try modifying a code cell and re-running it to see what happens.

To learn more about Colab, see [Welcome to Colaboratory!](https://colab.sandbox.google.com/notebooks/welcome.ipynb).

# Setup
In this section, we prepare the environment by creating the data directory and ensuring that the input text file is available locally.

This file will be used as input to the Apache Beam pipeline.

In [76]:
# 1) Setup + quick sanity checks (no printing of file contents)
import os
from pathlib import Path

data_dir = Path('data')
data_dir.mkdir(parents=True, exist_ok=True)

input_file = data_dir / 'prejudice.txt'
print("Using input file:", input_file)

# Ensure the file exists
if not input_file.exists():
    raise FileNotFoundError(f"Expected input file at {input_file} but it does not exist.")

# Optionally open the file to ensure it is readable (no printing)
with input_file.open('r', encoding='utf-8', errors='ignore') as f:
    first_line = f.readline()  # read one line silently just to confirm readability


Using input file: data/prejudice.txt


# Word count 
In this section, we build a simple Apache Beam pipeline that reads the local text file, splits it into words, and counts how many times each word appears. The results are then written to a single output file in the data/ directory.

In [79]:
# 2) Minimal Apache Beam pipeline (wordcount) reading local file and writing local output
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import os
from pathlib import Path
import re

input_path = str(input_file)              
output_prefix = 'outputs/output_words'    

# Ensure outputs directory exists
Path('outputs').mkdir(parents=True, exist_ok=True)

# Remove previous output file
if os.path.exists(output_prefix):
    os.remove(output_prefix)

options = PipelineOptions(['--runner=DirectRunner'])

def normalize_and_split(line):
    # PRESERVE CASE — do not lowercase
    # Keep only letters + apostrophes
    line = re.sub(r"[^A-Za-z']+", ' ', line)
    for w in line.split():
        w = w.strip("'")
        if w:
            yield w

with beam.Pipeline(options=options) as p:
    (
        p
        | 'Read' >> beam.io.ReadFromText(input_path)
        | 'Split' >> beam.FlatMap(normalize_and_split)
        | 'PairWithOne' >> beam.Map(lambda w: (w, 1))
        | 'Count' >> beam.CombinePerKey(sum)
        # FORMAT AS PYTHON TUPLE STRING
        | 'Format' >> beam.Map(lambda kv: str((kv[0], kv[1])))
        | 'Write' >> beam.io.WriteToText(output_prefix, shard_name_template='')
    )

print("Pipeline finished. Output written with prefix:", output_prefix)


Pipeline finished. Output written with prefix: outputs/output_words


# 3) Read and display top 20 words from the pipeline output
In this section, we read the word-count results produced by the pipeline and load them into a Python Counter. We then sort the counts and display the top twenty most frequent words found in the input text file.

In [82]:
from collections import Counter
from pathlib import Path
import re

out_path = Path('outputs/output_words')

# Detect file (or shard) if exact name doesn't match
if not out_path.exists():
    candidates = list(Path('outputs').glob('output_words*'))
    if candidates:
        out_path = candidates[0]
    else:
        raise FileNotFoundError("Could not find output file starting with 'outputs/output_words'")

print("Reading results from:", out_path)

counter = Counter()

tuple_pattern = re.compile(r"\('(.+?)',\s*(\d+)\)")

with out_path.open('r', encoding='utf-8', errors='ignore') as f:
    for line in f:
        line = line.strip()
        if not line:
            continue
        
        # Match tuple-style: ('WORD', 123)
        m = tuple_pattern.match(line)
        if m:
            word = m.group(1)
            count = int(m.group(2))
            counter[word] = count
            continue

print("\nTop 20 words:")
for word, cnt in counter.most_common(20):
    print(f"{word:>15} : {cnt}")


Reading results from: outputs/output_words

Top 20 words:
            the : 4195
             of : 1018
             as : 976
            and : 968
             to : 686
            The : 681
         gently : 672
        carried : 664
           with : 659
        between : 657
      Elizabeth : 654
         slowly : 643
             on : 623
           calm : 621
          Darcy : 606
        beneath : 363
      gathering : 363
         clouds : 363
  Conversations : 359
           long : 359
