In [1]:
# Advanced Apache Beam Analysis on Hamlet

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import re
from collections import Counter

In [2]:
!pip install apache-beam



In [3]:
input_file = '/content/hamlet.txt'  # Change this to your file path
output_prefix = 'outputs/hamlet_analysis'

In [4]:
!pip install apache-beam[interactive]



In [6]:
#============================================================================
# TRANSFORMATION 1: Advanced Word Count with Filtering
# ============================================================================
print("=" * 70)
print("PIPELINE 1: Case-Insensitive Word Count (Min Length: 5)")
print("=" * 70)

with beam.Pipeline() as pipeline:
    word_counts = (
        pipeline
        | 'Read Hamlet' >> beam.io.ReadFromText(input_file)
        | 'Convert to Lowercase' >> beam.Map(lambda line: line.lower())
        | 'Extract Words' >> beam.FlatMap(lambda line: re.findall(r"[a-z']+", line))
        | 'Filter Long Words' >> beam.Filter(lambda word: len(word) >= 5)
        | 'Pair with 1' >> beam.Map(lambda word: (word, 1))
        | 'Sum Counts' >> beam.CombinePerKey(sum)
        | 'Sort by Count' >> beam.combiners.ToList()
        | 'Get Top 20' >> beam.Map(lambda words: sorted(words, key=lambda x: x[1], reverse=True)[:20])
        | 'Flatten' >> beam.FlatMap(lambda x: x)
        | 'Format Output' >> beam.Map(lambda wc: f'{wc[0]:<20} {wc[1]:>5}')
        | 'Write Top Words' >> beam.io.WriteToText(f'{output_prefix}_top_words')
    )

print("\n✅ Pipeline 1 Complete: Top 20 words written to outputs/")

PIPELINE 1: Case-Insensitive Word Count (Min Length: 5)





✅ Pipeline 1 Complete: Top 20 words written to outputs/


In [7]:
# ============================================================================
# TRANSFORMATION 2: Character Analysis
# ============================================================================
print("\n" + "=" * 70)
print("PIPELINE 2: Character Speech Analysis")
print("=" * 70)

def extract_character_lines(line):
    """Extract character name and their dialogue"""
    # Pattern to match character names (all caps followed by period or colon)
    match = re.match(r'^\s*([A-Z][A-Z\s]+)[\.\:]', line)
    if match:
        character = match.group(1).strip()
        # Get the text after the character name
        dialogue = line[match.end():].strip()
        if dialogue:
            return [(character, dialogue)]
    return []

with beam.Pipeline() as pipeline:
    character_stats = (
        pipeline
        | 'Read Hamlet 2' >> beam.io.ReadFromText(input_file)
        | 'Extract Dialogues' >> beam.FlatMap(extract_character_lines)
        | 'Count Words per Character' >> beam.Map(lambda x: (x[0], len(x[1].split())))
        | 'Sum by Character' >> beam.CombinePerKey(sum)
        | 'Sort Characters' >> beam.combiners.ToList()
        | 'Top Characters' >> beam.Map(lambda chars: sorted(chars, key=lambda x: x[1], reverse=True)[:10])
        | 'Flatten 2' >> beam.FlatMap(lambda x: x)
        | 'Format Characters' >> beam.Map(lambda c: f'{c[0]:<30} {c[1]:>6} words')
        | 'Write Characters' >> beam.io.WriteToText(f'{output_prefix}_character_words')
    )

print("✅ Pipeline 2 Complete: Character analysis written to outputs/")


PIPELINE 2: Character Speech Analysis




✅ Pipeline 2 Complete: Character analysis written to outputs/


In [8]:
# ============================================================================
# TRANSFORMATION 3: Word Length Distribution
# ============================================================================
print("\n" + "=" * 70)
print("PIPELINE 3: Word Length Distribution")
print("=" * 70)

with beam.Pipeline() as pipeline:
    length_dist = (
        pipeline
        | 'Read Hamlet 3' >> beam.io.ReadFromText(input_file)
        | 'Extract Words 3' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line))
        | 'Get Word Lengths' >> beam.Map(lambda word: (len(word), 1))
        | 'Count by Length' >> beam.CombinePerKey(sum)
        | 'Sort by Length' >> beam.combiners.ToList()
        | 'Sort List' >> beam.Map(lambda items: sorted(items, key=lambda x: x[0]))
        | 'Flatten 3' >> beam.FlatMap(lambda x: x)
        | 'Format Lengths' >> beam.Map(lambda l: f'Length {l[0]:>2}: {"*" * (l[1] // 100)} ({l[1]})')
        | 'Write Lengths' >> beam.io.WriteToText(f'{output_prefix}_word_lengths')
    )

print("✅ Pipeline 3 Complete: Word length distribution written to outputs/")


PIPELINE 3: Word Length Distribution




✅ Pipeline 3 Complete: Word length distribution written to outputs/


In [9]:
# ============================================================================
# TRANSFORMATION 4: Unique Words per Act
# ============================================================================
print("\n" + "=" * 70)
print("PIPELINE 4: Vocabulary Richness per Act")
print("=" * 70)

class ExtractActWords(beam.DoFn):
    """Custom DoFn to track which act we're in and extract words"""
    def __init__(self):
        self.current_act = "PROLOGUE"

    def process(self, line):
        # Check if this line indicates a new act
        act_match = re.match(r'^ACT\s+([IVX]+)', line.strip())
        if act_match:
            self.current_act = f"ACT_{act_match.group(1)}"

        # Extract words from the line
        words = re.findall(r"[a-z']+", line.lower())
        for word in words:
            if len(word) >= 4:  # Only words with 4+ letters
                yield (self.current_act, word)

with beam.Pipeline() as pipeline:
    act_vocab = (
        pipeline
        | 'Read Hamlet 4' >> beam.io.ReadFromText(input_file)
        | 'Extract Act Words' >> beam.ParDo(ExtractActWords())
        | 'Remove Duplicates' >> beam.Distinct()
        | 'Count per Act' >> beam.combiners.Count.PerKey()
        | 'Format Acts' >> beam.Map(lambda av: f'{av[0]:<15} {av[1]:>6} unique words')
        | 'Write Act Vocab' >> beam.io.WriteToText(f'{output_prefix}_act_vocabulary')
    )

print("✅ Pipeline 4 Complete: Act vocabulary analysis written to outputs/")



PIPELINE 4: Vocabulary Richness per Act




✅ Pipeline 4 Complete: Act vocabulary analysis written to outputs/


In [10]:
# ============================================================================
# TRANSFORMATION 5: Sentiment Words Analysis
# ============================================================================
print("\n" + "=" * 70)
print("PIPELINE 5: Emotional Word Analysis")
print("=" * 70)

# Define sentiment word lists
POSITIVE_WORDS = {'love', 'good', 'sweet', 'fair', 'noble', 'grace', 'heaven',
                  'joy', 'happy', 'gentle', 'kind', 'honest', 'true', 'worthy'}
NEGATIVE_WORDS = {'death', 'dead', 'murder', 'revenge', 'mad', 'cruel', 'evil',
                  'villain', 'blood', 'hell', 'woe', 'grief', 'cursed', 'foul'}

def classify_sentiment(word):
    """Classify word as positive, negative, or neutral"""
    if word in POSITIVE_WORDS:
        return [('POSITIVE', word)]
    elif word in NEGATIVE_WORDS:
        return [('NEGATIVE', word)]
    return []

with beam.Pipeline() as pipeline:
    sentiment = (
        pipeline
        | 'Read Hamlet 5' >> beam.io.ReadFromText(input_file)
        | 'Extract Words 5' >> beam.FlatMap(lambda line: re.findall(r"[a-z']+", line.lower()))
        | 'Classify Sentiment' >> beam.FlatMap(classify_sentiment)
        | 'Count by Sentiment' >> beam.Map(lambda x: (x[0], 1))
        | 'Sum Sentiment' >> beam.CombinePerKey(sum)
        | 'Format Sentiment' >> beam.Map(lambda s: f'{s[0]:<12} words: {s[1]:>4}')
        | 'Write Sentiment' >> beam.io.WriteToText(f'{output_prefix}_sentiment')
    )

print("✅ Pipeline 5 Complete: Sentiment analysis written to outputs/")


PIPELINE 5: Emotional Word Analysis




✅ Pipeline 5 Complete: Sentiment analysis written to outputs/

PIPELINE 6: Scene Length Analysis
✅ Pipeline 6 Complete: Scene analysis written to outputs/


In [11]:
# ============================================================================
# TRANSFORMATION 6: Scene Analysis
# ============================================================================
print("\n" + "=" * 70)
print("PIPELINE 6: Scene Length Analysis")
print("=" * 70)

class SceneWordCounter(beam.DoFn):
    """Count words per scene"""
    def __init__(self):
        self.current_scene = "INTRO"
        self.word_count = 0

    def process(self, line):
        # Check for scene markers
        scene_match = re.match(r'Scene\s+([IVX]+)', line.strip())
        if scene_match:
            if self.word_count > 0:
                yield (self.current_scene, self.word_count)
            self.current_scene = f"SCENE_{scene_match.group(1)}"
            self.word_count = 0
        else:
            # Count words in this line
            words = re.findall(r"[a-zA-Z']+", line)
            self.word_count += len(words)

with beam.Pipeline() as pipeline:
    scene_lengths = (
        pipeline
        | 'Read Hamlet 6' >> beam.io.ReadFromText(input_file)
        | 'Count Scene Words' >> beam.ParDo(SceneWordCounter())
        | 'Format Scenes' >> beam.Map(lambda s: f'{s[0]:<15} {s[1]:>6} words')
        | 'Write Scenes' >> beam.io.WriteToText(f'{output_prefix}_scene_lengths')
    )

print("✅ Pipeline 6 Complete: Scene analysis written to outputs/")



PIPELINE 6: Scene Length Analysis




✅ Pipeline 6 Complete: Scene analysis written to outputs/


In [12]:
# ============================================================================
# DISPLAY RESULTS
# ============================================================================
print("\n" + "=" * 70)
print("ALL PIPELINES COMPLETED SUCCESSFULLY!")
print("=" * 70)
print("\nGenerated Output Files:")
print("1. hamlet_analysis_top_words-*       → Top 20 most frequent words")
print("2. hamlet_analysis_character_words-* → Characters ranked by dialogue")
print("3. hamlet_analysis_word_lengths-*    → Word length distribution")
print("4. hamlet_analysis_act_vocabulary-*  → Unique words per act")
print("5. hamlet_analysis_sentiment-*       → Positive vs negative words")
print("6. hamlet_analysis_scene_lengths-*   → Words per scene")
print("\n" + "=" * 70)

# Display sample results
import glob

print("\n📊 SAMPLE RESULTS:")
print("=" * 70)

for pattern in ['top_words', 'character_words', 'sentiment']:
    files = glob.glob(f'{output_prefix}_{pattern}-*')
    if files:
        print(f"\n--- {pattern.upper().replace('_', ' ')} ---")
        with open(files[0], 'r') as f:
            lines = f.readlines()[:10]  # Show first 10 lines
            for line in lines:
                print(line.strip())


ALL PIPELINES COMPLETED SUCCESSFULLY!

Generated Output Files:
1. hamlet_analysis_top_words-*       → Top 20 most frequent words
2. hamlet_analysis_character_words-* → Characters ranked by dialogue
3. hamlet_analysis_word_lengths-*    → Word length distribution
4. hamlet_analysis_act_vocabulary-*  → Unique words per act
5. hamlet_analysis_sentiment-*       → Positive vs negative words
6. hamlet_analysis_scene_lengths-*   → Words per scene


📊 SAMPLE RESULTS:

--- TOP WORDS ---
queen                  118
shall                  114
hamlet                 106
would                   80
there                   76
enter                   73
which                   64
speak                   63
their                   57
should                  55

--- CHARACTER WORDS ---
SCENE                               2 words
ACT I                               2 words
ACT III                             2 words
ACT IV                              2 words
ACT V                               2 words
HA

In [13]:
# Create a zip file of the outputs folder
!zip -r outputs.zip outputs/

# Download the zip file
from google.colab import files
files.download('outputs.zip')

  adding: outputs/ (stored 0%)
  adding: outputs/hamlet_analysis_sentiment-00000-of-00001 (deflated 32%)
  adding: outputs/hamlet_analysis_character_words-00000-of-00001 (deflated 81%)
  adding: outputs/hamlet_analysis_word_lengths-00000-of-00001 (deflated 76%)
  adding: outputs/hamlet_analysis_act_vocabulary-00000-of-00001 (deflated 61%)
  adding: outputs/hamlet_analysis_scene_lengths-00000-of-00001 (deflated 73%)
  adding: outputs/beam-temp-hamlet_analysis_scene_lengths-47ee2e62ae0c11f084060242ac1c000c/ (stored 0%)
  adding: outputs/beam-temp-hamlet_analysis_scene_lengths-47ee2e62ae0c11f084060242ac1c000c/60eeaf7c-8891-4842-a701-0a27e9c7becf.hamlet_analysis_scene_lengths (stored 0%)
  adding: outputs/beam-temp-hamlet_analysis_scene_lengths-0944510aae0c11f0a4170242ac1c000c/ (stored 0%)
  adding: outputs/beam-temp-hamlet_analysis_scene_lengths-0944510aae0c11f0a4170242ac1c000c/e8b7e325-1133-46e4-95b4-e332a5b6df29.hamlet_analysis_scene_lengths (stored 0%)
  adding: outputs/hamlet_analysis

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>