In [None]:
import os
LAB_ROOT = "."   # stay in current folder
os.makedirs("data", exist_ok=True)
os.makedirs("outputs", exist_ok=True)
import os; print("Working in:", os.getcwd()); print("Folders ready: data/, outputs/")

In [None]:
text = """Once upon a time, a curious student built a small machine that could learn.
The machine read thousands of pages, numbers, and songs, trying to make sense of the world.
Every day it grew a little smarter and a little kinder.
It learned to help people cook, drive, and dream.
But one day the student asked, “Will you ever replace me?”
The machine smiled in silence and wrote on the screen:
  I was never meant to replace you.
  I was meant to remind you how powerful you are when you learn.

From that day, the student and the machine worked together,
creating art, solving problems, and teaching others to imagine a better world."""
with open("data/ai_future_story.txt", "w", encoding="utf-8") as f:
    f.write(text)
print("Created data/ai_future_story.txt")

In [None]:
# This mirrors the classic Beam wordcount sample: run(argv=None), --input/--output flags, named transforms.

import argparse
import os
import re
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

class SplitWords(beam.DoFn):
    def process(self, element):
        line = element.lower()
        line = re.sub(r"[^a-z0-9\s']", " ", line)  # keep letters, digits, space, apostrophes
        for w in line.split():
            if w:
                yield w

def format_result(word_count):
    word, count = word_count
    return f"{word}\t{count}"

def run(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input",
        default="data/kinglear.txt",   # same default as the sample; we’ll override below
        help="Input path (file or glob), e.g., data/ai_future_story.txt or data/*.txt",
    )
    parser.add_argument(
        "--output",
        default="outputs/wordcount_output",
        help="Output prefix (Beam adds shard suffixes), e.g., outputs/wordcount_ai_future",
    )
    known_args, pipeline_args = parser.parse_known_args(argv)

    os.makedirs(os.path.dirname(known_args.output) or ".", exist_ok=True)

    pipeline_options = PipelineOptions(pipeline_args)  # add runner args here if you need
    with beam.Pipeline(options=pipeline_options) as p:
        lines = p | "Read" >> beam.io.ReadFromText(known_args.input)
        words = lines | "Split" >> beam.ParDo(SplitWords())
        pairs = words | "PairWithOne" >> beam.Map(lambda w: (w, 1))
        counts = pairs | "CountPerWord" >> beam.CombinePerKey(sum)
        _ = (counts
             | "Format" >> beam.Map(format_result)
             | "Write" >> beam.io.WriteToText(known_args.output))


In [None]:
# Process the file we just created

argv = [
    "--input",  "data/ai_future_story.txt",
    "--output", "outputs/wordcount_ai_future",
    # You can also pass runner options here, e.g. "--runner", "DirectRunner"
]
run(argv)
print("Done.")


In [None]:
import glob, os

parts = sorted(glob.glob("outputs/wordcount_ai_future*"))
print("Output shards:", parts)

for p in parts:
    print(f"\n=== {os.path.basename(p)} ===")
    with open(p, "r", encoding="utf-8") as f:
        print(f.read(), end="")


In [None]:
import glob, pandas as pd

parts = sorted(glob.glob("outputs/wordcount_ai_future-*"))
rows = []
for p in parts:
    with open(p, "r", encoding="utf-8") as f:
        for line in f:
            if "\t" in line:
                w, c = line.strip().split("\t")
                rows.append((w, int(c)))

df = (pd.DataFrame(rows, columns=["word","count"])
        .groupby("word", as_index=False).sum()
        .sort_values("count", ascending=False))

df.to_csv("outputs/story_counts_full.csv", index=False)
print("Wrote outputs/story_counts_full.csv with", len(df), "rows")


In [None]:
import re, glob, os
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

os.makedirs("data", exist_ok=True)
os.makedirs("outputs", exist_ok=True)

# make sure the input exists
if not os.path.exists("data/ai_future_story.txt"):
    open("data/ai_future_story.txt","w",encoding="utf-8").write(
        "hello beam hello world\nbeam makes word count easy\n"
    )

INPUT_PATH = "data/ai_future_story.txt"
OUTPUT_PREFIX = "outputs/wordcount_ai_future"

def normalize_line(s):
    return re.sub(r"[^a-z0-9\s']", " ", s.lower())

with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner"])) as p:
    (p
     | "Read"   >> beam.io.ReadFromText(INPUT_PATH)
     | "Norm"   >> beam.Map(normalize_line)
     | "Split"  >> beam.FlatMap(lambda l: l.split())
     | "Pair1"  >> beam.Map(lambda w: (w,1))
     | "Sum"    >> beam.CombinePerKey(sum)
     | "Format" >> beam.Map(lambda kv: f"{kv[0]}\t{kv[1]}")
     | "Write"  >> beam.io.WriteToText(OUTPUT_PREFIX)
    )

sorted(glob.glob("outputs/wordcount_ai_future*"))


In [None]:
import glob, shutil

parts = sorted(glob.glob("outputs/wordcount_ai_future*"))
if not parts:
    raise SystemExit("No shards found under outputs/. Re-run the pipeline cell first.")

out_txt = "outputs/story_counts_full.txt"
with open(out_txt, "w", encoding="utf-8") as w:
    for p in parts:
        with open(p, "r", encoding="utf-8") as f:
            shutil.copyfileobj(f, w)

print("Merged", len(parts), "file(s) ->", out_txt)


In [None]:
import pandas as pd
rows=[]
for p in parts:
    for line in open(p, "r", encoding="utf-8"):
        if "\t" in line:
            w,c = line.strip().split("\t")
            rows.append((w,int(c)))
df = (pd.DataFrame(rows, columns=["word","count"])
        .groupby("word", as_index=False).sum()
        .sort_values("count", ascending=False))
df.to_csv("outputs/story_counts_full.csv", index=False)
print("Wrote outputs/story_counts_full.csv with", len(df), "rows")
