In [16]:
import apache_beam as beam
import re

# Function to clean each line of text by removing special characters
def remove_special_characters(line):
    # Use regex to replace any character that is not a-z, A-Z, or 0-9 with an empty string
    cleaned_line = re.sub(r'[^a-zA-Z0-9\s]', '', line)
    return cleaned_line

# Define the Beam pipeline
p = beam.Pipeline()

# Apply transformations
result = (
    p 
    | 'Read' >> beam.io.ReadFromText('../datasets/word_count_data.txt')
    | 'Remove Special Characters' >> beam.Map(remove_special_characters)  # Apply regex cleaning
    | 'Split' >>  beam.FlatMap(lambda a: a.split(" "))
    | 'Count' >>  beam.Map(lambda a: (a,1))
    | 'CombineByKey' >> beam.CombinePerKey(sum)
    | "Print Results" >> beam.Map(print)
)

# Run the pipeline
p.run()


('\tKING', 29)
('LEAR', 47)
('', 2216)
('\tDRAMATIS', 1)
('PERSONAE', 1)
('LEAR\tking', 1)
('of', 438)
('Britain', 1)
('KING', 214)
('OF', 15)
('FRANCE', 5)
('DUKE', 3)
('BURGUNDY\tBURGUNDY', 1)
('CORNWALL\tCORNWALL', 1)
('ALBANY\tALBANY', 1)
('EARL', 2)
('KENT\tKENT', 1)
('GLOUCESTER\tGLOUCESTER', 1)
('EDGAR\tson', 1)
('to', 425)
('Gloucester', 24)
('EDMUND\tbastard', 1)
('son', 28)
('CURAN\ta', 1)
('courtier', 1)
('Old', 12)
('Man\ttenant', 1)
('Doctor', 4)
('Fool', 14)
('OSWALD\tsteward', 1)
('Goneril', 12)
('\tA', 24)
('Captain', 5)
('employed', 1)
('by', 67)
('Edmund', 28)
('\tGentleman', 1)
('attendant', 1)
('on', 91)
('Cordelia', 20)
('Gentleman', 12)
('Herald', 2)
('\tServants', 2)
('Cornwall', 12)
('\tFirst', 2)
('Servant', 3)
('\tSecond', 2)
('\tThird', 2)
('GONERIL\t', 1)
('\t', 3)
('REGAN\t', 5)
('daughters', 28)
('Lear', 17)
('CORDELIA\t', 2)
('\tKnights', 1)
('Lears', 3)
('train', 9)
('Captains', 1)
('Messengers', 1)
('\tSoldiers', 2)
('and', 577)
('Attendants', 7)
('\tKn

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x148fbf6d9a0>

In [17]:
import apache_beam as beam
import re

def remove_special_characters(line):
    # Use regex to replace any character that is not a-z, A-Z, or 0-9 with an empty string
    cleaned_line = re.sub(r'[^a-zA-Z0-9\s]', '', line)
    return cleaned_line

def filter_empty_words(word):
    return word != ''

p = beam.Pipeline()

result = (
    p 
    | 'Read' >> beam.io.ReadFromText('../datasets/word_count_data.txt')
    | 'Remove Special Characters' >> beam.Map(remove_special_characters)  # Apply regex cleaning
    | 'Split' >> beam.FlatMap(lambda line: line.split())  # Split each line into words
    | 'Filter Empty Words' >> beam.Filter(filter_empty_words)  # Filter out empty strings
    | 'Pair with 1' >> beam.Map(lambda word: (word, 1))  # Create (word, 1) tuples
    | 'Count Words' >> beam.CombinePerKey(sum)  # Sum the counts for each word
    | "Print Results" >> beam.Map(print)  # Print the word counts
)

p.run()


('KING', 243)
('LEAR', 236)
('DRAMATIS', 1)
('PERSONAE', 1)
('king', 64)
('of', 446)
('Britain', 2)
('OF', 15)
('FRANCE', 10)
('DUKE', 3)
('BURGUNDY', 8)
('CORNWALL', 63)
('ALBANY', 67)
('EARL', 2)
('KENT', 156)
('GLOUCESTER', 141)
('EDGAR', 126)
('son', 29)
('to', 429)
('Gloucester', 26)
('EDMUND', 99)
('bastard', 7)
('CURAN', 6)
('a', 353)
('courtier', 1)
('Old', 13)
('Man', 11)
('tenant', 3)
('Doctor', 12)
('Fool', 73)
('OSWALD', 53)
('steward', 2)
('Goneril', 12)
('A', 51)
('Captain', 12)
('employed', 1)
('by', 69)
('Edmund', 32)
('Gentleman', 48)
('attendant', 1)
('on', 93)
('Cordelia', 22)
('Herald', 6)
('Servants', 9)
('Cornwall', 12)
('First', 7)
('Servant', 11)
('Second', 4)
('Third', 4)
('GONERIL', 72)
('REGAN', 86)
('daughters', 30)
('Lear', 17)
('CORDELIA', 42)
('Knights', 2)
('Lears', 4)
('train', 9)
('Captains', 1)
('Messengers', 1)
('Soldiers', 7)
('and', 591)
('Attendants', 8)
('Knight', 8)
('Messenger', 10)
('SCENE', 27)
('ACT', 26)
('I', 622)
('King', 3)
('palace', 4)

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x148fbac6880>

In [28]:
import apache_beam as beam
import re

def remove_special_characters(line):
    # Use regex to replace any character that is not a-z, A-Z, or 0-9 with an empty string
    cleaned_line = re.sub(r'[^a-zA-Z0-9\s]', '', line)
    return cleaned_line

def filter_empty_words(word):
    return word != ''

p = beam.Pipeline()

result = (
    p 
    | 'Read' >> beam.io.ReadFromText('../datasets/word_count_data.txt')
    | beam.Map(lambda record: record.split(","))
    | beam.FlatMap(lambda element:[(i,1) for i in element if i ] )
    | beam.CombinePerKey(sum)
    | "Print Results" >> beam.Map(print)  # Print the word counts
)

p.run()


('\tKING LEAR', 28)
('\tDRAMATIS PERSONAE', 1)
('LEAR\tking of Britain  (KING LEAR:)', 1)
('KING OF FRANCE:', 1)
('DUKE OF BURGUNDY\t(BURGUNDY:)', 1)
('DUKE OF CORNWALL\t(CORNWALL:)', 1)
('DUKE OF ALBANY\t(ALBANY:)', 1)
('EARL OF KENT\t(KENT:)', 1)
('EARL OF GLOUCESTER\t(GLOUCESTER:)', 1)
('EDGAR\tson to Gloucester.', 1)
('EDMUND\tbastard son to Gloucester.', 1)
('CURAN\ta courtier.', 1)
('Old Man\ttenant to Gloucester.', 1)
('Doctor:', 1)
('Fool:', 1)
('OSWALD\tsteward to Goneril.', 1)
('\tA Captain employed by Edmund. (Captain:)', 1)
('\tGentleman attendant on Cordelia. (Gentleman:)', 1)
('\tA Herald.', 1)
('\tServants to Cornwall.', 1)
('\t(First Servant:)', 1)
('\t(Second Servant:)', 1)
('\t(Third Servant:)', 1)
('GONERIL\t|', 1)
('\t|', 2)
('REGAN\t|  daughters to Lear.', 1)
('CORDELIA\t|', 1)
("\tKnights of Lear's train", 1)
(' Captains', 1)
(' Messengers', 1)
('\tSoldiers', 2)
(' and Attendants', 1)
('\t(Knight:)', 1)
('\t(Captain:)', 1)
('\t(Messenger:)', 1)
('SCENE\tBritain.',

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x148fc66fa60>