In [1]:
!pip install apache-beam

Defaulting to user installation because normal site-packages is not writeable
Collecting apache-beam
  Downloading apache_beam-2.69.0-cp312-cp312-win_amd64.whl.metadata (21 kB)
Collecting crcmod<2.0,>=1.7 (from apache-beam)
  Downloading crcmod-1.7.tar.gz (89 kB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting fastavro<2,>=0.23.6 (from apache-beam)
  Downloading fastavro-1.12.1-cp312-cp312-win_amd64.whl.metadata (5.9 kB)
Collecting fasteners<1.0,>=0.3 (from apache-beam)
  Downloading fasteners-0.20-py3-none-any.whl.metadata (4.8 kB)
Collecting hdfs<3.0.0,>=2.1.0 (from apache-beam)
  Downloading hdfs-2.7.3.tar.gz (43 kB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting jsonpickle<4.0.0,>=3.0.0 (from apache-beam)
  Downloading jsonpickle-3.4.2-py3-none-any.whl.metadata (8.1 kB)
Collecting objsize<0.8.0,>=0.6.1 (from apache-beam)
  Downloading objsize-0.7.1-py3-

  DEPRECATION: Building 'crcmod' using the legacy setup.py bdist_wheel mechanism, which will be removed in a future version. pip 25.3 will enforce this behaviour change. A possible replacement is to use the standardized build interface by setting the `--use-pep517` option, (possibly combined with `--no-build-isolation`), or adding a `pyproject.toml` file to the source tree of 'crcmod'. Discussion can be found at https://github.com/pypa/pip/issues/6334
  DEPRECATION: Building 'hdfs' using the legacy setup.py bdist_wheel mechanism, which will be removed in a future version. pip 25.3 will enforce this behaviour change. A possible replacement is to use the standardized build interface by setting the `--use-pep517` option, (possibly combined with `--no-build-isolation`), or adding a `pyproject.toml` file to the source tree of 'hdfs'. Discussion can be found at https://github.com/pypa/pip/issues/6334
  DEPRECATION: Building 'docopt' using the legacy setup.py bdist_wheel mechanism, which will

In [11]:
import apache_beam as beam
import re
import os
import glob
import shutil

# Define input and output paths
inputs_pattern = 'E:/Masters/MLOps/MLOps_ApacheBeam/data/kinglear.txt'
outputs_prefix = 'E:/Masters/MLOps/MLOps_ApacheBeam/outputs/part'
output_dir = 'E:/Masters/MLOps/MLOps_ApacheBeam/outputs'

def cleanup_output_files():
    """Remove existing output files to prevent FileExistsError"""
    # Pattern to match all output files
    pattern = os.path.join(output_dir, 'part*')
    existing_files = glob.glob(pattern)
    
    if existing_files:
        print(f"Found {len(existing_files)} existing output files. Cleaning up...")
        for file in existing_files:
            try:
                os.remove(file)
                print(f"  Removed: {os.path.basename(file)}")
            except Exception as e:
                print(f"  Error removing {file}: {e}")
        print("Cleanup complete.\n")
    else:
        print("No existing output files found.\n")

if not os.path.exists(output_dir):
    os.makedirs(output_dir)
    print(f"Created output directory: {output_dir}\n")

cleanup_output_files()

print("Starting Apache Beam pipeline...")
print("-" * 50)

# Running locally in the DirectRunner
with beam.Pipeline() as pipeline:
    # Store the word counts in a PCollection.
    # Each element is a tuple of (word, count) of types (str, int).
    word_counts = (
        # The input PCollection is an empty pipeline.
        pipeline
        # Read lines from a text file.
        | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)
        # Element type: str - text line
        
        # Use a regular expression to iterate over all words in the line.
        # FlatMap will yield an element for every element in an iterable.
        | 'Find words' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line))
        # Element type: str - word
        
        # Convert to lowercase for case-insensitive counting (optional)
        | 'Lowercase' >> beam.Map(lambda word: word.lower())
        
        # Create key-value pairs where the value is 1, this way we can group by
        # the same word while adding those 1s and get the counts for every word.
        | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))
        # Element type: (str, int) - key: word, value: 1
        
        # Group by key while combining the value using the sum() function.
        | 'Group and sum' >> beam.CombinePerKey(sum)
        # Element type: (str, int) - key: word, value: counts
    )
    
    # We can process a PCollection through other pipelines too.
    (
        # The input PCollection is the word_counts created from the previous step.
        word_counts
        # Format the results into a string so we can write them to a file.
        | 'Format results' >> beam.Map(lambda word_count: str(word_count))
        # Element type: str - text line
        
        # Finally, write the results to a file.
        # Note: You can also use beam.io.WriteToText(outputs_prefix, file_name_suffix='.txt', shard_name_template='')
        # to control the output file naming
        | 'Write results' >> beam.io.WriteToText(outputs_prefix, file_name_suffix='.txt')
    )

output_filename_prefix = os.path.basename(outputs_prefix)

output_files = [f for f in os.listdir(output_dir) 
                if f.startswith(output_filename_prefix) and f.endswith('.txt')]

if output_files:
    # Read the first output file
    output_file_path = os.path.join(output_dir, output_files[0])
    
    print(f"\nOutput file created: {output_file_path}")
    print(f"File size: {os.path.getsize(output_file_path):,} bytes")
    print("-" * 50)
    
    # Read all lines for processing
    with open(output_file_path, 'r') as file:
        all_lines = file.readlines()
    
    # Display first 50 lines 
    print("\nFirst 50 word counts (raw format):")
    print("-" * 50)
    for i, line in enumerate(all_lines[:50]):
        print(f"{i+1:3}. {line.strip()}")
    
    if len(all_lines) > 50:
        print(f"... and {len(all_lines) - 50} more entries")
    
    print("\n" + "=" * 50)
    
    # Parse and analyze the word counts
    word_counts_list = []
    for line in all_lines:
        line = line.strip()
        if line.startswith('(') and line.endswith(')'):
            content = line[1:-1]
            last_comma = content.rfind(',')
            if last_comma != -1:
                word = content[:last_comma].strip("'\"")
                try:
                    count = int(content[last_comma+1:].strip())
                    word_counts_list.append((word, count))
                except ValueError:
                    continue
    
    print(f"\nStatistics:")
    print("-" * 30)
    print(f"Total unique words: {len(word_counts_list):,}")
    total_word_occurrences = sum(count for _, count in word_counts_list)
    print(f"Total word occurrences: {total_word_occurrences:,}")
    print(f"Average occurrences per word: {total_word_occurrences/len(word_counts_list):.2f}")
            
else:
    print("No output files found. Please check if the pipeline ran successfully.")
    print("Check for any error messages above.")

Found 1 existing output files. Cleaning up...
  Removed: part-00000-of-00001.txt
Cleanup complete.

Starting Apache Beam pipeline...
--------------------------------------------------

Output file created: E:/Masters/MLOps/MLOps_ApacheBeam/outputs\part-00000-of-00001.txt
File size: 60,633 bytes
--------------------------------------------------

First 50 word counts (raw format):
--------------------------------------------------
  1. ('king', 311)
  2. ('lear', 253)
  3. ('dramatis', 1)
  4. ('personae', 1)
  5. ('of', 483)
  6. ('britain', 2)
  7. ('france', 32)
  8. ('duke', 26)
  9. ('burgundy', 20)
 10. ('cornwall', 75)
 11. ('albany', 73)
 12. ('earl', 10)
 13. ('kent', 175)
 14. ('gloucester', 167)
 15. ('edgar', 136)
 16. ('son', 29)
 17. ('to', 580)
 18. ('edmund', 131)
 19. ('bastard', 7)
 20. ('curan', 7)
 21. ('a', 417)
 22. ('courtier', 1)
 23. ('old', 58)
 24. ('man', 78)
 25. ('tenant', 3)
 26. ('doctor', 12)
 27. ('fool', 120)
 28. ('oswald', 56)
 29. ('steward', 2)
 30

In [9]:
import apache_beam as beam
import re
import os
import glob
import shutil

# Define input and output paths
inputs_pattern = 'E:/Masters/MLOps/MLOps_ApacheBeam/data/macbeth_TXT_FolgerShakespeare.txt'
outputs_prefix = 'E:/Masters/MLOps/MLOps_ApacheBeam/outputs/macbeth'
output_dir = 'E:/Masters/MLOps/MLOps_ApacheBeam/outputs'

# Clean up existing output files before running the pipeline
def cleanup_output_files():
    """Remove existing output files to prevent FileExistsError"""
    # Pattern to match all output files
    pattern = os.path.join(output_dir, 'part*')
    existing_files = glob.glob(pattern)
    
    if existing_files:
        print(f"Found {len(existing_files)} existing output files. Cleaning up...")
        for file in existing_files:
            try:
                os.remove(file)
                print(f"  Removed: {os.path.basename(file)}")
            except Exception as e:
                print(f"  Error removing {file}: {e}")
        print("Cleanup complete.\n")
    else:
        print("No existing output files found.\n")

# Create output directory if it doesn't exist
if not os.path.exists(output_dir):
    os.makedirs(output_dir)
    print(f"Created output directory: {output_dir}\n")

# Clean up existing files
cleanup_output_files()

print("Starting Apache Beam pipeline...")
print("-" * 50)

# Running locally in the DirectRunner
with beam.Pipeline() as pipeline:
    # Store the word counts in a PCollection.
    # Each element is a tuple of (word, count) of types (str, int).
    word_counts = (
        # The input PCollection is an empty pipeline.
        pipeline
        # Read lines from a text file.
        | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)
        # Element type: str - text line
        
        # Use a regular expression to iterate over all words in the line.
        # FlatMap will yield an element for every element in an iterable.
        | 'Find words' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line))
        # Element type: str - word
        
        # Convert to lowercase for case-insensitive counting (optional)
        | 'Lowercase' >> beam.Map(lambda word: word.lower())
        
        # Create key-value pairs where the value is 1, this way we can group by
        # the same word while adding those 1s and get the counts for every word.
        | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))
        # Element type: (str, int) - key: word, value: 1
        
        # Group by key while combining the value using the sum() function.
        | 'Group and sum' >> beam.CombinePerKey(sum)
        # Element type: (str, int) - key: word, value: counts
    )
    
    # We can process a PCollection through other pipelines too.
    (
        # The input PCollection is the word_counts created from the previous step.
        word_counts
        # Format the results into a string so we can write them to a file.
        | 'Format results' >> beam.Map(lambda word_count: str(word_count))
        # Element type: str - text line
        
        # Finally, write the results to a file.
        # Note: You can also use beam.io.WriteToText(outputs_prefix, file_name_suffix='.txt', shard_name_template='')
        # to control the output file naming
        | 'Write results' >> beam.io.WriteToText(outputs_prefix, file_name_suffix='.txt')
    )

print("\nPipeline completed successfully!")
print("=" * 50)

# Windows-compatible way to read and display the output file
# Find the output file (it will have a pattern like part-00000-of-00001.txt)
output_filename_prefix = os.path.basename(outputs_prefix)

# Find the actual output file
output_files = [f for f in os.listdir(output_dir) 
                if f.startswith(output_filename_prefix) and f.endswith('.txt')]

if output_files:
    # Read the first output file
    output_file_path = os.path.join(output_dir, output_files[0])
    
    print(f"\nOutput file created: {output_file_path}")
    print(f"File size: {os.path.getsize(output_file_path):,} bytes")
    print("-" * 50)
    
    # Read all lines for processing
    with open(output_file_path, 'r') as file:
        all_lines = file.readlines()
    
    # Display first 50 lines (raw format)
    print("\nFirst 50 word counts (raw format):")
    print("-" * 50)
    for i, line in enumerate(all_lines[:50]):
        print(f"{i+1:3}. {line.strip()}")
    
    if len(all_lines) > 50:
        print(f"... and {len(all_lines) - 50} more entries")
    
    print("\n" + "=" * 50)
    
    # Parse and analyze the word counts
    word_counts_list = []
    for line in all_lines:
        # Parse the tuple string format
        line = line.strip()
        if line.startswith('(') and line.endswith(')'):
            # Remove parentheses and split
            content = line[1:-1]
            # Find the last comma (word might contain apostrophes)
            last_comma = content.rfind(',')
            if last_comma != -1:
                word = content[:last_comma].strip("'\"")
                try:
                    count = int(content[last_comma+1:].strip())
                    word_counts_list.append((word, count))
                except ValueError:
                    continue
    
    # Statistics
    print(f"\nStatistics:")
    print("-" * 30)
    print(f"Total unique words: {len(word_counts_list):,}")
    total_word_occurrences = sum(count for _, count in word_counts_list)
    print(f"Total word occurrences: {total_word_occurrences:,}")
    print(f"Average occurrences per word: {total_word_occurrences/len(word_counts_list):.2f}")
    
    # Sort by count and show top 20 most common words
    word_counts_list.sort(key=lambda x: x[1], reverse=True)
    print("\nTop 20 most frequent words:")
    print("-" * 30)
    print(f"{'Rank':<6} {'Word':<15} {'Count':<10} {'Frequency'}")
    print("-" * 50)
    for i, (word, count) in enumerate(word_counts_list[:20], 1):
        frequency = (count / total_word_occurrences) * 100
        print(f"{i:<6} {word:<15} {count:<10,} {frequency:>6.2f}%")
    
    # Show some interesting statistics
    print("\n" + "=" * 50)
    print("Word length distribution:")
    print("-" * 30)
    
    # Words that appear only once (hapax legomena)
    hapax = [word for word, count in word_counts_list if count == 1]
    print(f"Words appearing only once: {len(hapax):,} ({len(hapax)/len(word_counts_list)*100:.1f}%)")
    
    # Words that appear more than 100 times
    common = [word for word, count in word_counts_list if count > 100]
    print(f"Words appearing >100 times: {len(common):,}")
    
    # Longest and shortest words
    if word_counts_list:
        longest_word = max(word_counts_list, key=lambda x: len(x[0]))
        shortest_words = [w for w, c in word_counts_list if len(w) == 1]
        print(f"Longest word: '{longest_word[0]}' ({len(longest_word[0])} chars, appears {longest_word[1]} times)")
        print(f"Single-letter words: {len(shortest_words)}")
        
else:
    print("No output files found. Please check if the pipeline ran successfully.")
    print("Check for any error messages above.")

Found 1 existing output files. Cleaning up...
  Removed: part-00000-of-00001.txt
Cleanup complete.

Starting Apache Beam pipeline...
--------------------------------------------------

Pipeline completed successfully!

Output file created: E:/Masters/MLOps/MLOps_ApacheBeam/outputs\macbeth-00000-of-00001.txt
File size: 47,972 bytes
--------------------------------------------------

First 50 word counts (raw format):
--------------------------------------------------
  1. ('macbeth', 288)
  2. ('by', 52)
  3. ('william', 1)
  4. ('shakespeare', 3)
  5. ('edited', 1)
  6. ('barbara', 1)
  7. ('a', 254)
  8. ('mowat', 1)
  9. ('and', 572)
 10. ('paul', 1)
 11. ('werstine', 1)
 12. ('with', 158)
 13. ('michael', 1)
 14. ('poston', 1)
 15. ('rebecca', 1)
 16. ('niles', 1)
 17. ('folger', 2)
 18. ('library', 1)
 19. ('https', 1)
 20. ('edu', 1)
 21. ('shakespeares', 1)
 22. ('works', 1)
 23. ('created', 1)
 24. ('on', 64)
 25. ('jul', 1)
 26. ('from', 58)
 27. ('fdt', 1)
 28. ('version', 1)
