## STEP 01: DIVISION of BLOCKS (4)

In [8]:
# Assume 'large_text_file.txt' is your 10MB text file.
filename = 'C:/Users/sohar/OneDrive/Desktop/Map_reduce/rhyme.txt'

# Determine the total number of lines to approximate splitting.
with open(filename, 'r') as file:
    lines = file.readlines()

total_lines = len(lines)
lines_per_block = total_lines // 4

# Split the file into 4 parts and write each part to a separate 'node' file.
for i in range(4):
    with open(f'node_{i+1}.txt', 'w') as file:
        for line in lines[i*lines_per_block : (i+1)*lines_per_block]:
            file.write(line)

        

## STEP 02: RECORD READER

In [9]:
def read_records_from_node(node_filename):
    """
    Reads records from a given node file and returns a list of (key, value) pairs,
    where key is the line number and value is the text of the line.
    """
    with open(node_filename, 'r') as file:
        lines = file.readlines()

    # Generate (line_number, line_text) pairs
    records = [(i+1, line.strip()) for i, line in enumerate(lines)]
    return records

# Process for all 4 nodes
for i in range(4):
    node_filename = f'node_{i+1}.txt'
    records = read_records_from_node(node_filename)
    
    # Here you can process records further as needed
    # For demonstration, we'll just print the first 5 records of each node
    print(f"First 5 records from {node_filename}:")
    for record in records[:5]:
        print(record)
    print("\n---\n")  # Just to separate output for clarity


First 5 records from node_1.txt:
(1, 'ï»¿')
(2, 'Sing a Song of Sixpence.')
(3, '')
(4, 'A brand new sixpence fresh from the Mint!  How it sparkled and glittered')
(5, 'in the dancing sunlight!  Such a treasure for a small girl to possess!')

---

First 5 records from node_2.txt:
(1, '"Oh dear, what can the matter be?')
(2, 'Dear, dear, what can the matter be?')
(3, 'Oh dear, what can the matter be?')
(4, 'Nellieâ€™s so long making tea!')
(5, 'She promised to give me some bread and some honey,')

---

First 5 records from node_3.txt:
(1, '')
(2, 'She was a strange child, and led a lonely life, shut up in the almost')
(3, 'deserted castle with no one but her miserly old grandfather and old')
(4, 'Nanny for company.  It was no wonder that she grew up with curious')
(5, 'unchildlike fancies, which were yet not altogether unchildlike.  Her')

---

First 5 records from node_4.txt:
(1, 'though it were lined with cotton wool.  Elsie felt cold and stiff, and')
(2, 'her limbs ached--she felt sh

## STEP 03:  MAPPER 

In [10]:
from collections import Counter

def mapper(records):
    """
    Processes a list of records from a node, ignoring the keys and considering only the values (sentences).
    For each word in a sentence, it calculates the frequency of that word in that sentence.
    
    :param records: List of (key, value) pairs where key is the line number and value is the sentence.
    :return: A list of (word, frequency) pairs for words in the sentences.
    """
    word_frequencies = []
    for _, sentence in records:
        # Tokenize the sentence into words, assuming words are separated by spaces.
        # Convert to lowercase to ensure case-insensitive counting.
        words = sentence.lower().split()
        # Count the frequency of each word in the sentence
        frequencies = Counter(words)
        # Extend the word_frequencies list with (word, frequency) pairs from this sentence
        word_frequencies.extend(frequencies.items())
    return word_frequencies

# Assuming `read_records_from_node` has already been defined and used to read records from each node
# Example usage for node 1
node_1_records = read_records_from_node('node_1.txt')
node_1_word_frequencies = mapper(node_1_records)
node_2_records = read_records_from_node('node_2.txt')
node_2_word_frequencies = mapper(node_2_records)
node_3_records = read_records_from_node('node_3.txt')
node_3_word_frequencies = mapper(node_3_records)
node_4_records = read_records_from_node('node_4.txt')
node_4_word_frequencies = mapper(node_4_records)

# For demonstration, print the first 10 (word, frequency) pairs from node 1
print("First 10 (word, frequency) pairs from node 1:")
for word, freq in node_1_word_frequencies[:10]:
    print(f"{word}: {freq}")
    
print("First 10 (word, frequency) pairs from node 2:")
for word, freq in node_2_word_frequencies[:10]:
    print(f"{word}: {freq}")
    
print("First 10 (word, frequency) pairs from node 3:")
for word, freq in node_3_word_frequencies[:10]:
    print(f"{word}: {freq}")
    
print("First 10 (word, frequency) pairs from node 4:")
for word, freq in node_4_word_frequencies[:10]:
    print(f"{word}: {freq}")



First 10 (word, frequency) pairs from node 1:
ï»¿: 1
sing: 1
a: 1
song: 1
of: 1
sixpence.: 1
a: 1
brand: 1
new: 1
sixpence: 1
First 10 (word, frequency) pairs from node 2:
"oh: 1
dear,: 1
what: 1
can: 1
the: 1
matter: 1
be?: 1
dear,: 2
what: 1
can: 1
First 10 (word, frequency) pairs from node 3:
she: 1
was: 1
a: 2
strange: 1
child,: 1
and: 1
led: 1
lonely: 1
life,: 1
shut: 1
First 10 (word, frequency) pairs from node 4:
though: 1
it: 1
were: 1
lined: 1
with: 1
cotton: 1
wool.: 1
elsie: 1
felt: 1
cold: 1


## STEP 04:  SHUFFLE SORTING

In [11]:
# Assuming mapper and read_records_from_node are already defined
# Aggregate outputs from all 4 mappers
all_mapper_outputs = []
for i in range(4):
    node_records = read_records_from_node(f'node_{i+1}.txt')
    mapper_output = mapper(node_records)
    all_mapper_outputs.extend(mapper_output)


In [12]:
from collections import defaultdict

def shuffle_and_sort(mapper_outputs):
    """
    Aggregates word counts from all mapper outputs, effectively performing the shuffle operation.
    This function groups counts for each unique word across all mapper outputs.
    
    :param mapper_outputs: Combined list of (word, frequency) pairs from all mappers.
    :return: Dictionary where the key is the word and the value is a list of counts.
    """
    shuffle_sort_output = defaultdict(list)
    for word, frequency in mapper_outputs:
        shuffle_sort_output[word].append(frequency)
    return shuffle_sort_output

# Perform shuffle and sort on aggregated mapper outputs
node_5_output = shuffle_and_sort(all_mapper_outputs)

# Display the output for demonstration purposes
for word, frequencies in list(node_5_output.items())[:10]:  # Limiting output for brevity
    print(f"{word}: {frequencies}")


ï»¿: [1]
sing: [1, 1, 1, 1]
a: [1, 1, 2, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 3, 1, 1, 1, 2, 1, 2, 1, 1, 1, 1, 1, 2, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 4, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 2, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 3, 1, 1, 1, 1]
song: [1, 1]
of: [1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 2, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
sixpence.: [1, 1, 1, 1]
brand: [1]
new: [1, 1, 1, 1, 2, 1]
sixpence: [1, 1, 1]
fresh: [1, 1, 1]


## STEP 05: REDUCER

In [13]:
def reducer(shuffle_sort_output):
    """
    Aggregates the counts for each word by summing up the frequencies.
    
    :param shuffle_sort_output: A dictionary where the key is the word and the value is a list of counts.
    :return: A dictionary with the word as the key and the total count as the value.
    """
    reduced_output = {}
    for word, frequencies in shuffle_sort_output.items():
        # Sum up all the frequencies for the word to get the total count
        total_count = sum(frequencies)
        reduced_output[word] = total_count
    return reduced_output

# Assuming node_5_output is the output from the shuffle and sort step
final_output = reducer(node_5_output)

# Display some of the reduced output for demonstration purposes
for word, total_count in list(final_output.items())[:10]:  # Limiting output for brevity
    print(f"{word}: {total_count}")


ï»¿: 1
sing: 4
a: 141
song: 2
of: 83
sixpence.: 4
brand: 1
new: 7
sixpence: 3
fresh: 3
