In [None]:
import threading
import queue
import csv
import os

def map_reduce(data):

  def map_function(document, output_queue):
    words = document.lower().split()
    word_counts = {}
    for word in words:
      if word in word_counts:
        word_counts[word] += 1
      else:
        word_counts[word] = 1
    output_queue.put(word_counts)

  def shuffle_sort_reduce(input_queue, output_file):
    word_counts = {}
    while not input_queue.empty():
      counts = input_queue.get()
      for word, count in counts.items():
        if word in word_counts:
          word_counts[word] += count
        else:
          word_counts[word] = count
    with open(output_file, 'w', newline='') as csvfile:
      writer = csv.writer(csvfile)
      writer.writerow(['Word', 'Count'])
      for word, count in sorted(word_counts.items()):
        writer.writerow([word, count])

  # Create queues
  map_queue = queue.Queue()

  # Create threads for mapper functions
  map_threads = []
  for doc in data:
    thread = threading.Thread(target=map_function, args=(doc, map_queue))
    map_threads.append(thread)
    thread.start()

  # Wait for all mapper threads to finish
  for thread in map_threads:
    thread.join()

  # Create shuffle, sort, and reduce thread
  shuffle_reduce_thread = threading.Thread(target=shuffle_sort_reduce, args=(map_queue, 'output.csv'))
  shuffle_reduce_thread.start()

  # Wait for shuffle, sort, and reduce thread to finish
  shuffle_reduce_thread.join()

  print("Word counts written to output.csv")

# Get documents from Google Drive folder
data = []
folder_path = '/content/data' # Replace with the actual path to your folder
for filename in os.listdir(folder_path):
  if filename.endswith(".txt"):
    with open(os.path.join(folder_path, filename), 'r') as f:
      data.append(f.read())

# Run map reduce
map_reduce(data)


Word counts written to output.csv


In [None]:
# prompt: do it for multithreading

import threading
import queue
import csv
import os

def map_reduce(data):
  def map_function(document, output_queue):
    words = document.lower().split()
    word_counts = {}
    for word in words:
      if word in word_counts:
        word_counts[word] += 1
      else:
        word_counts[word] = 1
    output_queue.put(word_counts)

  def reduce_function(word_counts_chunk, output_queue):
    reduced_counts = {}
    for word, count in word_counts_chunk.items():
      if word in reduced_counts:
        reduced_counts[word] += count
      else:
        reduced_counts[word] = count
    output_queue.put(reduced_counts)

  def shuffle_sort_reduce(input_queue, output_file):
    word_counts = {}
    while not input_queue.empty():
      counts = input_queue.get()
      for word, count in counts.items():
        if word in word_counts:
          word_counts[word] += count
        else:
          word_counts[word] = count

    # Split word counts into chunks for multiple threads
    num_threads = 4  # You can adjust this based on your system
    chunk_size = len(word_counts) // num_threads
    word_counts_chunks = [dict(list(word_counts.items())[i:i+chunk_size]) for i in range(0, len(word_counts), chunk_size)]

    # Create queues and threads for reducers
    reduce_queue = queue.Queue()
    reduce_threads = []
    for chunk in word_counts_chunks:
      thread = threading.Thread(target=reduce_function, args=(chunk, reduce_queue))
      reduce_threads.append(thread)
      thread.start()

    # Wait for reducer threads to finish
    for thread in reduce_threads:
      thread.join()

    # Merge results from reducers
    final_word_counts = {}
    while not reduce_queue.empty():
      counts = reduce_queue.get()
      for word, count in counts.items():
        if word in final_word_counts:
          final_word_counts[word] += count
        else:
          final_word_counts[word] = count

    with open(output_file, 'w', newline='') as csvfile:
      writer = csv.writer(csvfile)
      writer.writerow(['Word', 'Count'])
      for word, count in sorted(final_word_counts.items()):
        writer.writerow([word, count])

  # Create queues
  map_queue = queue.Queue()

  # Create threads for mapper functions
  map_threads = []
  for doc in data:
    thread = threading.Thread(target=map_function, args=(doc, map_queue))
    map_threads.append(thread)
    thread.start()

  # Wait for all mapper threads to finish
  for thread in map_threads:
    thread.join()

  # Create shuffle, sort, and reduce thread
  shuffle_reduce_thread = threading.Thread(target=shuffle_sort_reduce, args=(map_queue, 'output2.csv'))
  shuffle_reduce_thread.start()

  # Wait for shuffle, sort, and reduce thread to finish
  shuffle_reduce_thread.join()

  print("Word counts written to output.csv")

# Get documents from Google Drive folder
data = []
folder_path = '/content/data' # Replace with the actual path to your folder
for filename in os.listdir(folder_path):
  if filename.endswith(".txt"):
    with open(os.path.join(folder_path, filename), 'r') as f:
      data.append(f.read())

# Run map reduce
map_reduce(data)


Word counts written to output.csv
