In [31]:
# Installing Apache Beam and calling the dataset.

!pip install apache-beam[gcp] -q
!mkdir -p data
!gsutil cp gs://dataflow-samples/shakespeare/macbeth.txt data/

Copying gs://dataflow-samples/shakespeare/macbeth.txt...
/ [0 files][    0.0 B/102.9 KiB]                                                / [1 files][102.9 KiB/102.9 KiB]                                                
Operation completed over 1 objects/102.9 KiB.                                    


In [32]:
#Importing apache beam and necessary data files.

import apache_beam as beam
import re
import string

inputs_pattern = 'data/macbeth.txt'
outputs_top_names = 'outputs_macbeth_top/part'
outputs_word_lengths = 'outputs_macbeth_lengths/part'

In [33]:
#Helper functions for the beam pipeline.

def clean_token(token):
    return token.strip(string.punctuation).capitalize()

def is_likely_name(word):
    ignore = {'The', 'This', 'That', 'From', 'With', 'When', 'Thou', 'Shall'}
    return len(word) >= 4 and word not in ignore

def word_length_pair(word):
    cleaned = word.strip(string.punctuation)
    if cleaned:
        return (len(cleaned), 1)
    else:
        return (0, 0)
with beam.Pipeline() as pipeline:
    word_lengths = (
        pipeline
        | 'Read lines again' >> beam.io.ReadFromText(inputs_pattern)
        | 'Extract capitalized words again' >> beam.FlatMap(lambda line: re.findall(r"\b[A-Z][a-zA-Z']+\b", line))
        | 'Clean tokens again' >> beam.Map(clean_token)
        | 'Filter likely names again' >> beam.Filter(is_likely_name)
        | 'Pair lengths' >> beam.Map(word_length_pair)
        | 'Filter valid lengths' >> beam.Filter(lambda x: x[0] > 0)
        | 'Count by length' >> beam.CombinePerKey(sum)
        | 'Format length output' >> beam.Map(lambda kv: f"Length {kv[0]}: {kv[1]}")
        | 'Write lengths' >> beam.io.WriteToText(outputs_word_lengths)
    )



In [34]:
with beam.Pipeline() as pipeline:

    # To get top 10 Character Names
    top_characters = (
        pipeline
        | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)
        | 'Extract capitalized words' >> beam.FlatMap(lambda line: re.findall(r"\b[A-Z][a-zA-Z']+\b", line))
        | 'Clean words' >> beam.Map(clean_token)
        | 'Filter likely names' >> beam.Filter(is_likely_name)
        | 'Pair with 1' >> beam.Map(lambda name: (name, 1))
        | 'Count occurrences' >> beam.CombinePerKey(sum)
        | 'Get top 10' >> beam.combiners.Top.Of(10, key=lambda kv: kv[1])
        | 'Flatten top 10' >> beam.FlatMap(lambda lst: lst)
        | 'Format output' >> beam.Map(lambda kv: f"{kv[0]}: {kv[1]}")
        | 'Write top names' >> beam.io.WriteToText(outputs_top_names)
    )

    # To get Word Length Distribution
    word_lengths = (
        pipeline
        | 'Read lines again' >> beam.io.ReadFromText(inputs_pattern)
        | 'Extract capitalized words again' >> beam.FlatMap(lambda line: re.findall(r"\b[A-Z][a-zA-Z']+\b", line))
        | 'Clean tokens again' >> beam.Map(clean_token)
        | 'Filter likely names again' >> beam.Filter(is_likely_name)
        | 'Pair lengths' >> beam.Map(word_length_pair)
        | 'Filter None lengths' >> beam.Filter(lambda x: x is not None)
        | 'Count by length' >> beam.CombinePerKey(sum)
        | 'Format length output' >> beam.Map(lambda kv: f"Length {kv[0]}: {kv[1]}")
        | 'Write lengths' >> beam.io.WriteToText(outputs_word_lengths)
    )



In [35]:
# Result to view Top 10 Character Names
!echo "Top 10 Character Names:"
!head -n 20 outputs_macbeth_top/part-00000-of-*

Top 10 Character Names:
Macbeth: 311
Macduff: 109
Lady: 95
Banquo: 73
Enter: 64
Malcolm: 59
What: 58
Ross: 54
Witch: 54
First: 49


In [36]:
# Results to view Word Length Distribution
!echo "Word Length Distribution:"
!cat outputs_macbeth_lengths/part-00000-of-*

Word Length Distribution:
Length 7: 655
Length 8: 165
Length 6: 498
Length 9: 100
Length 4: 687
Length 5: 515
Length 14: 2
Length 11: 20
Length 10: 40
Length 12: 1
Length 13: 1
