# Apache Beam with NLP Enhancement - Named Entity Recognition

This notebook demonstrates how to use Apache Beam for data processing with an enhanced focus on **Named Entity Recognition (NER)** using spaCy. Instead of simple word counting, we'll extract meaningful entities like character names, locations, and organizations from Shakespeare's King Lear.

## Learning Objectives

- Understand Apache Beam's programming model and pipeline structure
- Learn how to integrate NLP libraries (spaCy) with Apache Beam
- Extract and analyze named entities from text data
- Compare traditional word counting with entity-based analysis

## What one will Learn of it

1. **Basic Apache Beam Concepts**: PCollections, Transforms, and Pipelines
2. **NLP Integration**: Using spaCy for Named Entity Recognition
3. **Entity Analysis**: Extracting PERSON, GPE (Geopolitical), and ORG entities
4. **Data Processing**: Counting entity frequencies and generating insights

## Getting Started

To navigate through different sections, use the table of contents. From **View** drop-down list, select **Table of contents**.

To run a code cell, you can click the **Run cell** button at the top left of the cell, or select it and press **`Shift+Enter`**. Try modifying a code cell and re-running it to see what happens.

In [None]:
# Run and print a shell command.
def run(cmd):
  print('>> {}'.format(cmd))
  !{cmd}
  print('')

# Install required packages
print("Installing Apache Beam and spaCy...")
run('pip install --quiet apache-beam spacy')

# Download the English language model for spaCy
print("Downloading English language model...")
run('python -m spacy download en_core_web_sm')

# Create data directory and download sample text
print("Setting up data directory...")
run('mkdir -p data')

# Download King Lear text file (if not already present)
import os
if not os.path.exists('data/kinglear.txt'):
    print("Downloading King Lear text file...")
    run('gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/')
else:
    print("King Lear text file already exists.")

print("Setup complete! Ready to process text with NLP.")

Installing Apache Beam and spaCy...
>> pip install --quiet apache-beam spacy

Downloading English language model...
>> python -m spacy download en_core_web_sm
Collecting en-core-web-sm==3.8.0
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.8.0/en_core_web_sm-3.8.0-py3-none-any.whl (12.8 MB)
     ---------------------------------------- 0.0/12.8 MB ? eta -:--:--
     --------- ------------------------------ 3.1/12.8 MB 29.0 MB/s eta 0:00:01
     ----------------------------- ---------- 9.4/12.8 MB 27.7 MB/s eta 0:00:01
     --------------------------------------- 12.8/12.8 MB 25.3 MB/s eta 0:00:00
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')

Setting up data directory...
>> mkdir -p data

>> gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/


A subdirectory or file data already exists.
Error occurred while processing: data.



Setup complete! Ready to process text with NLP.


'gsutil' is not recognized as an internal or external command,
operable program or batch file.


## NLP Enhancement - Named Entity Recognition


- **PERSON**: Character names (e.g., "King Lear", "Cordelia", "Edmund")
- **GPE**: Geopolitical entities like countries and cities (e.g., "Britain", "France")
- **ORG**: Organizations and institutions (e.g., "Duke of Cornwall")

### Why Named Entity Recognition?

Instead of counting every word, NER helps us:
1. **Focus on meaningful content**: Extract only relevant entities
2. **Reduce noise**: Filter out common words like "the", "and", "of"
3. **Gain insights**: Understand character relationships and locations
4. **Improve analysis**: Get structured data from unstructured text

### spaCy Integration

SpaCy's pre-trained English model (`en_core_web_sm`) which provides:
- High accuracy entity recognition
- Multiple entity types
- Fast processing suitable for Apache Beam pipelines


In [2]:
# Import required libraries
import apache_beam as beam
import spacy
from typing import List, Tuple

# Load the English language model
print("Loading spaCy English model...")
nlp = spacy.load("en_core_web_sm")

def extract_entities(text: str) -> List[Tuple[str, str]]:
    """
    Extract named entities from text using spaCy.
    
    Args:
        text (str): Input text to process
        
    Returns:
        List[Tuple[str, str]]: List of (entity_text, entity_type) tuples
    """
    # Process the text with spaCy
    doc = nlp(text)
    
    # Extract entities of interest
    entities = []
    target_types = {'PERSON', 'GPE', 'ORG'}  # Person, Geopolitical, Organization
    
    for ent in doc.ents:
        if ent.label_ in target_types:
            # Clean and normalize entity text
            entity_text = ent.text.strip()
            if len(entity_text) > 1:  # Filter out single characters
                entities.append((entity_text, ent.label_))
    
    return entities

# Test the entity extraction function
sample_text = "King Lear of Britain had three daughters: Goneril, Regan, and Cordelia."
print(f"Sample text: {sample_text}")
print("Extracted entities:")
for entity, entity_type in extract_entities(sample_text):
    print(f"  - {entity} ({entity_type})")


Failed to import GCSFileSystem; loading of this filesystem will be skipped. Error details: cannot import name 'storage' from 'google.cloud' (unknown location)


Loading spaCy English model...
Sample text: King Lear of Britain had three daughters: Goneril, Regan, and Cordelia.
Extracted entities:
  - Britain (GPE)
  - Goneril (GPE)
  - Regan (GPE)
  - Cordelia (GPE)


## Enhanced Entity Count Pipeline

This pipeline will:

1. **Read** the King Lear text file
2. **Extract** named entities using spaCy
3. **Count** entity frequencies by type
4. **Output** structured results showing entity counts

### Pipeline Architecture

The pipeline follows Apache Beam's standard pattern:
- **Source**: Read text files
- **Transform**: Extract entities and count frequencies  
- **Sink**: Write results to output files

In [4]:
# Enhanced Entity Count Pipeline with NLP
inputs_pattern = 'data/kinglear.txt'
outputs_prefix = 'outputs/entity_counts'

print("Starting Enhanced Entity Count Pipeline...")
print("=" * 50)

# Running locally in the DirectRunner with entity extraction
# Note: For large files, consider processing in smaller chunks to avoid timeout issues
with beam.Pipeline() as pipeline:
    # Store the entity counts in a PCollection.
    # Each element is a tuple of ((entity, entity_type), count)
    entity_counts = (
        # Start with the pipeline
        pipeline

        # Read lines from the text file
        | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)
        # Element type: str - text line

        # Extract named entities using our spaCy function
        | 'Extract entities' >> beam.FlatMap(extract_entities)
        # Element type: (str, str) - (entity_text, entity_type)

        # Create key-value pairs for counting
        | 'Pair entities with 1' >> beam.Map(lambda entity: (entity, 1))
        # Element type: ((str, str), int) - ((entity, type), 1)

        # Group by entity and sum the counts
        | 'Group and sum' >> beam.CombinePerKey(sum)
        # Element type: ((str, str), int) - ((entity, type), count)
    )

    # Process the entity counts for output
    (
        entity_counts

        # Format results with entity type information
        | 'Format results' >> beam.Map(
            lambda entity_count: f"{entity_count[0][0]} ({entity_count[0][1]}): {entity_count[1]}"
        )
        # Element type: str - formatted result line

        # Write results to file
        | 'Write results' >> beam.io.WriteToText(outputs_prefix)
    )

print("Pipeline execution completed!")
print("=" * 50)

# Display the results
print("Entity Count Results:")
print("-" * 30)
run('head -n 50 {}-00000-of-*'.format(outputs_prefix))

Starting Enhanced Entity Count Pipeline...




Pipeline execution completed!
Entity Count Results:
------------------------------
>> head -n 50 outputs/entity_counts-00000-of-*



'head' is not recognized as an internal or external command,
operable program or batch file.


## Analysis and Insights



1. **Processed** Shakespeare's King Lear using Apache Beam
2. **Extracted** named entities using spaCy's NLP capabilities
3. **Counted** entity frequencies by type (PERSON, GPE, ORG)
4. **Generated** structured insights from unstructured text

### Key Benefits of NER vs. Word Counting

| Aspect | Traditional Word Count | NER Entity Count |
|--------|----------------------|------------------|
| **Focus** | All words (including "the", "and", "of") | Only meaningful entities |
| **Insights** | Basic frequency analysis | Character and location analysis |
| **Noise** | High (common words dominate) | Low (filtered to relevant content) |
| **Structure** | Unstructured word list | Categorized by entity type |

### Expected Results

From King Lear, you should see entities like:
- **PERSON**: King Lear, Cordelia, Edmund, Gloucester, Kent
- **GPE**: Britain, France, Cornwall, Albany
- **ORG**: Duke of Cornwall, Earl of Kent

This provides much more meaningful insights than counting every occurrence of "the" or "and"!


In [5]:
# Additional Analysis: Entity Type Breakdown
print("Analyzing Entity Types...")
print("=" * 40)

# Read and analyze the results
try:
    with open('outputs/entity_counts-00000-of-00001', 'r') as f:
        lines = f.readlines()
    
    # Parse results and group by entity type
    entity_types = {'PERSON': [], 'GPE': [], 'ORG': []}
    
    for line in lines:
        if '(' in line and ')' in line:
            # Parse format: "Entity (TYPE): count"
            parts = line.strip().split(' (')
            if len(parts) == 2:
                entity = parts[0]
                type_and_count = parts[1].split('): ')
                if len(type_and_count) == 2:
                    entity_type = type_and_count[0]
                    count = int(type_and_count[1])
                    
                    if entity_type in entity_types:
                        entity_types[entity_type].append((entity, count))
    
    # Display results by type
    for entity_type, entities in entity_types.items():
        if entities:
            print(f"\n{entity_type} Entities (Top 10):")
            print("-" * 30)
            # Sort by count and show top 10
            sorted_entities = sorted(entities, key=lambda x: x[1], reverse=True)[:10]
            for entity, count in sorted_entities:
                print(f"  {entity}: {count}")
    
    print(f"\nSummary:")
    print(f"  Total PERSON entities: {len(entity_types['PERSON'])}")
    print(f"  Total GPE entities: {len(entity_types['GPE'])}")
    print(f"  Total ORG entities: {len(entity_types['ORG'])}")
    
except FileNotFoundError:
    print("Output file not found. Please run the pipeline first.")
except Exception as e:
    print(f"Error analyzing results: {e}")

print("\nEnhanced Apache Beam NLP Pipeline Complete!")
print("You've successfully integrated spaCy with Apache Beam for entity extraction!")


Analyzing Entity Types...

PERSON Entities (Top 10):
------------------------------
  REGAN: 69
  Tom: 21
  Kent: 16
  KENT: 9
  Shall: 7
  Edgar: 7
  ho: 7
  Gloucester: 7
  Burgundy: 6
  Gentleman: 5

GPE Entities (Top 10):
------------------------------
  thou: 32
  Thou: 29
  France: 21
  Cordelia: 20
  Dover: 15
  Gloucester: 14
  Regan: 13
  FRANCE: 8
  Albany: 8
  Goneril: 7

ORG Entities (Top 10):
------------------------------
  ALBANY: 60
  Gentleman: 22
  GLOUCESTER: 18
  EDGAR: 16
  Gloucester: 15
  KENT: 12
  GONERIL: 5
  Burgundy: 5
  Nay: 3
  EDGAR]: 3

Summary:
  Total PERSON entities: 103
  Total GPE entities: 55
  Total ORG entities: 130

Enhanced Apache Beam NLP Pipeline Complete!
You've successfully integrated spaCy with Apache Beam for entity extraction!


## Alternative: Small Sample Processing

If you encounter timeout issues with the full file, here's a version that processes just the first few lines to demonstrate the concept:


In [6]:
# Small Sample Processing - Process first 50 lines only
print("Processing small sample (first 50 lines)...")

# Read first 50 lines from the file
sample_lines = []
with open('data/kinglear.txt', 'r', encoding='utf-8') as f:
    for i, line in enumerate(f):
        if i >= 50:  # Only process first 50 lines
            break
        sample_lines.append(line.strip())

print(f"Loaded {len(sample_lines)} lines for processing")

# Process the sample with Apache Beam
with beam.Pipeline() as pipeline:
    entity_counts = (
        pipeline
        | 'Create sample data' >> beam.Create(sample_lines)
        | 'Extract entities' >> beam.FlatMap(extract_entities)
        | 'Pair entities with 1' >> beam.Map(lambda entity: (entity, 1))
        | 'Group and sum' >> beam.CombinePerKey(sum)
    )
    
    # Format results
    formatted_results = (
        entity_counts
        | 'Format results' >> beam.Map(
            lambda entity_count: f"{entity_count[0][0]} ({entity_count[0][1]}): {entity_count[1]}"
        )
    )
    
    # Collect and display results
    results = formatted_results | beam.combiners.ToList()
    
    def print_sample_results(results):
        print("Sample Entity Count Results:")
        print("-" * 30)
        for result in results:
            print(f"  {result}")
    
    results | beam.Map(print_sample_results)

print("Sample processing completed!")


Processing small sample (first 50 lines)...
Loaded 50 lines for processing
Sample Entity Count Results:
------------------------------
  Britain (GPE): 1
  FRANCE (GPE): 1
  DUKE OF ALBANY (ORG): 1
  ALBANY (ORG): 1
  EDGAR (ORG): 1
  Gloucester (ORG): 3
  Goneril (GPE): 1
  Gentleman (ORG): 2
  Cordelia (GPE): 1
  Herald (ORG): 1
  Cornwall (GPE): 1
Sample processing completed!
