<a href="https://colab.research.google.com/github/baban9/MapReduce-/blob/master/MapReduce_with_multithreading.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [13]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [14]:
import os
os.chdir('/content/drive/Shared drives/IDS 561 - Big Data /Assignment 1 ')
!ls

'Assignment 1.ipynb'	      'multithreading of ids 561 Assignment 1.ipynb'
'Frequency count.csv'	       pride.txt
'ids 561 Assignment 1.ipynb'  'Word count.csv'
 IDS561_lab2.ipynb


In [15]:
import string
import re
from operator import itemgetter
from collections import defaultdict
import pandas as pd
import concurrent.futures
import threading

import time



In [16]:
start_time = time.time()

def data_cleaning(file_name):
  """
  This function preprocesses data for
  - removing numbers
  - removing punctuations and any special symbols
  - converting uppercase to lower case words

  -- input - file name in .txt format 
  -- output - preprocessing of the text in the .txt file
  """

  line = open(file_name, 'rt')
  text = line.read().strip()
  
  # removing numbers, punctuations, and special symbols
  text_processing = re.sub('[^a-zA-Z\.]', ' ', text)
  text_only = re.sub("  "," ", text_processing)

  # lower casing characters 
  text_processed = text_only.lower()

  return text_processed

def data_split(processed_text):
  """
  This function splits the data into two parts
  - Part1 includes the first 5000 lines of the text file, 
  - Part2 includes the rest text

  -- input - processed text after data cleaning 
  -- output - data split into two parts
  """
  lines =  processed_text.split(".")

  part1_data = lines[0:5000] 
  text_a = ' '.join([str(e) for e in part1_data]) 

  part2_data = lines[5001:]
  text_b = ' '.join([str(e) for e in part2_data])

  return text_a, text_b

# Mapper function
  """
  This is Mapper function that produce a
  set of key-value pairs for Part1 and
  Part2 subsets respectively. 

  -- input - splited data into two parts by data split function
  -- output - Mapped data containing key value pairs
  """
def Mapper(text):
    mapped = []
    for word in text.split(" "):
      if word !="":
        mapped.append((word,1))
    
    return mapped

 # Sorting
  
def Sorting(mapped_1, mapped_2):
  """ This is Sorting function that will combine the output from 
  two mapper functions and sort them based on keys

  -- input - Outputs from two mapper functions
  -- output - Sorted Key-Value pairs
  """
  combined = mapped_1 + mapped_2
  sorted_words = sorted(combined, key=lambda x: x[0])

  return sorted_words

# partitioning 
  """
  This function will partition the tokens (i.e., words) starting with
  letter (“n” to “z”), and the others ("a" to "m").

  -- input - Sorted Key-Value pairs
  -- output - Ordered Partitioned key and count of frequency of words pairs
  """
def partition(words_sorted):

  index = [x for x, y in enumerate(words_sorted) if y[0].startswith("n")][0]
  partition_a_m = words_sorted[0:index]
  partition_n_z = words_sorted[index+1:]

  return partition_a_m, partition_n_z

# Reducer function
  """
  This function will collect values from each of the Ordered Partitioned containing the key
  and count the frequency of words.

  -- input - Ordered Partitioned Key-Value pairs output from partition function
  -- output - Aggregation of (intermediate key-word count frequency pair) into a smaller set of key-word count frequency pairs
  """
def Reducer(sorted_list):
    grouped = defaultdict(int) 

    for word in sorted_list:
      grouped[word[0]] +=1

    return grouped

# Combining output from both the reducers
  
def output(word_dict_a_m, word_dict_n_z):
  """
  This function will combine the output of the two
  partitions together and will return words and frequency count of the words

  -- input - Aggregated output from the two reducer function
  -- output - Dataframe containing the word and frequency count of the words
  """
  output = {**word_dict_a_m, **word_dict_n_z}
  df = pd.DataFrame(output.items(),columns = ['word','frequency'])

  return df



In [17]:
# Running Multithreading 
"""The multithreading starts here 
      with monitoring the time of execution start
"""
if __name__ == "__main__":
  start_time = time.time()
  
  processed_text = data_cleaning('pride.txt')
  text_5000, text_rest = data_split(processed_text)

  # mapper threads
  """Using ThreadPoolExecuter on Mapper. The executer will submit the Mapper function 
     and the argument which is the split data from the data_split function
  """
  with concurrent.futures.ThreadPoolExecutor() as executor:
    mapped_a = executor.submit(Mapper, text_5000)
    mapped_b = executor.submit(Mapper, text_rest)

  mapped_5000 = mapped_a.result()
  mapped_rest = mapped_b.result()

  words_sorted = Sorting(mapped_5000, mapped_rest)

  partition_a_m, partition_n_z = partition(words_sorted)

  # reducer threads
  """Using ThreadPoolExecuter on Reducer. The executer will submit the Reducer function 
     and the argument which is the Ordered Partitioned key and count of frequency of words pairs
  """
  with concurrent.futures.ThreadPoolExecutor() as executor:
    word_dict_thread_a= executor.submit(Reducer, partition_a_m)
    word_dict_thread_n= executor.submit(Reducer, partition_n_z)

  word_dict_a_m = word_dict_thread_a.result()
  word_dict_n_z = word_dict_thread_n.result()

  data = output(word_dict_a_m,word_dict_n_z)

  df_sorted_word = data.copy() 
  df_sorted_freq = data.sort_values(['frequency'], ascending=False)

# Output Word count to CSV file
  """This function would write the Word count into a CSV file.
      Input - sorted word dataframe 
      Output - Output CSV to the local shared drive
  """
  df_sorted_word.to_csv("/content/drive/Shared drives/IDS 561 - Big Data /Assignment 1 /Word count.csv")
  df_sorted_freq.to_csv("/content/drive/Shared drives/IDS 561 - Big Data /Assignment 1 /Frequency count.csv")


# Execution time
time.time() - start_time

0.24154376983642578