สมาชิกผู้จัดทำ

สิทธิเจตน์ วงศ์ทิชาวัฒน์ 6210503853

นทวัจน์ เมี้ยนละม้าย 6210503624

Adapt from

https://colab.research.google.com/github/pnavaro/big-data/blob/master/notebooks/05-MapReduce.ipynb

https://colab.research.google.com/github/RPI-DATA/course-intro-ml-app/blob/master/content/notebooks/18-big-data/01-intro-mapreduce.ipynb


# Functional programming review


In [None]:
def double_everything_in(data):
    result = []
    for i in data:
        result.append(2 * i)
    return result

The above code violates the "do not repeat yourself" principle of good software engineering practice.

https://en.wikipedia.org/wiki/Don%27t_repeat_yourself


In [None]:
double_everything_in([1, 2, 3, 4, 5])

[2, 4, 6, 8, 10]

Consider the code

In [None]:
def double(x):
    return x*2
def double_everything_in(data):
    result = []
    for i in data:
        result.append(double(i))
    return result

In [None]:
double_everything_in([1, 2, 3, 4, 5])

[2, 4, 6, 8, 10]

Still 

The above code violates the "do not repeat yourself" principle of good software engineering practice.


# Passing function as value
Consider:



In [None]:
def apply_f_to_everything_in(f, data):
    result = []
    for x in data:
        result.append(f(x))
    return result

In [None]:
apply_f_to_everything_in(double, [1, 2, 3, 4, 5])

[2, 4, 6, 8, 10]

# Lambda expressions
We can use anonymous functions to save having to define a function each time we want to use map.


In [None]:
apply_f_to_everything_in(lambda x: x*x, [1, 2, 3, 4, 5])

[1, 4, 9, 16, 25]

In [None]:
# consider this code.
def concat_everything_in(data1, data2):
    result = []
    for i,j in zip(data1,data2):
        result.append( i+ j)
    return result

 

concat_everything_in (['a','c'],['b','d'])

['ab', 'cd']

In [None]:
#solution
def concat (d1,d2):
  return d1+d2
def apply_f_to_everything_2_arg(f, data1, data2):
    result = []
    for x,y in zip(data1,data2):
        result.append(f(x,y))
    return result
apply_f_to_everything_2_arg(concat,['a','c'],['b','d'])

['ab', 'cd']

In [None]:
#  what is wrong with this call: apply_f_to_everything_2_arg (concant_everything_in, .....)

def concat_everything_in(data1, data2):
    result = []
    for i,j in zip(data1,data2):
        result.append( i+ j)
    return result

 
apply_f_to_everything_2_arg(concat_everything_in,['a','c'],['b','d'])

[['ab'], ['cd']]

# Python's `map` function

- Python has a built-in function `map` which is much faster than our version.

In [None]:
map(lambda x: x*x, [1, 2, 3, 4, 5])

<map at 0x7f5aca3575d0>

## Implementing reduce

- The `reduce` function is an example of a [fold](https://en.wikipedia.org/wiki/Fold_%28higher-order_function%29).

- There are different ways we can fold data.

- The following implements a *left* fold.

In [None]:
def foldl(f, data, z):
    if (len(data) == 0):
        print (z)
        return z
    else:
        head = data[0]
        tail = data[1:]
        print ("Folding", head, "with", tail, "using", z)
        partial_result = f(z, data[0])
        print ("Partial result is", partial_result)
        return foldl(f, tail, partial_result)  

In [None]:
def add(x, y):
    return x + y

foldl(add, [3, 3, 3, 3, 3], 0)

Folding 3 with [3, 3, 3, 3] using 0
Partial result is 3
Folding 3 with [3, 3, 3] using 3
Partial result is 6
Folding 3 with [3, 3] using 6
Partial result is 9
Folding 3 with [3] using 9
Partial result is 12
Folding 3 with [] using 12
Partial result is 15
15


15

In [None]:
def mul(x, y):
    return x * y


In [None]:
# what is the result?
foldl(mul, [3, 3, 3, 3, 3], 0)

Folding 3 with [3, 3, 3, 3] using 0
Partial result is 0
Folding 3 with [3, 3, 3] using 0
Partial result is 0
Folding 3 with [3, 3] using 0
Partial result is 0
Folding 3 with [3] using 0
Partial result is 0
Folding 3 with [] using 0
Partial result is 0
0


0

In [None]:
# how to modify it to get 3^5
foldl(mul, [3, 3, 3, 3, 3], 1)

Folding 3 with [3, 3, 3, 3] using 1
Partial result is 3
Folding 3 with [3, 3, 3] using 3
Partial result is 9
Folding 3 with [3, 3] using 9
Partial result is 27
Folding 3 with [3] using 27
Partial result is 81
Folding 3 with [] using 81
Partial result is 243
243


243

In [None]:
foldl(lambda x, y: x + y, [1, 2, 3, 4, 5], 0)

Folding 1 with [2, 3, 4, 5] using 0
Partial result is 1
Folding 2 with [3, 4, 5] using 1
Partial result is 3
Folding 3 with [4, 5] using 3
Partial result is 6
Folding 4 with [5] using 6
Partial result is 10
Folding 5 with [] using 10
Partial result is 15
15


15

In [None]:
foldl(lambda x, y: x - y, [1, 2, 3, 4, 5], 0)

Folding 1 with [2, 3, 4, 5] using 0
Partial result is -1
Folding 2 with [3, 4, 5] using -1
Partial result is -3
Folding 3 with [4, 5] using -3
Partial result is -6
Folding 4 with [5] using -6
Partial result is -10
Folding 5 with [] using -10
Partial result is -15
-15


-15

In [None]:
# Question: what is the equivalent expression

(((((0 - 1) - 2) - 3) - 4) - 5)

-15

In [None]:
#Question: or is it the same as above?
(5- (4 - (3- (2- (1 -  0)))))

3

In [None]:
def foldr(f, data, z):
    if (len(data) == 0):
        return z
    else:
        return f(data[0], foldr(f, data[1:], z))    

In [None]:
foldr(lambda x, y: x - y, [1, 2, 3, 4, 5], 0)

3

In [None]:
foldr(lambda x, y: x + y, [1, 2, 3, 4, 5], 0)

15

Why foldl and foldr give different result for subtract.

- Subtraction is neither [commutative](https://en.wikipedia.org/wiki/Commutative_property) nor [associative](https://en.wikipedia.org/wiki/Associative_property), so the order in which apply the fold matters:

Python's reduce function.
Python's built-in reduce function is a left fold.

In [None]:
from functools import reduce
reduce(lambda x, y: x + y, [1, 2, 3, 4, 5])

15

In [None]:
reduce(lambda x, y: x - y, [1, 2, 3, 4, 5], 0)

-15

#Functional programming and parallelism
Functional programming lends itself to parallel programming.

The map function can easily be parallelised through data-level parallelism,

provided that the function we supply as an argument is free from side-effects
(which is why we avoid working with mutable data).
We can see this by rewriting it so:

In [None]:
def perform_computation(f, result, data, i):
    print ("Computing the ", i, "th result...")
    # This could be scheduled on a different CPU
    result[i] = f(data[i])

def my_map(f, data):
    result = [None] * len(data)
    for i in range(len(data)):
        perform_computation(f, result, data, i)
    # Wait for other CPUs to finish, and then..
    return result

In [None]:
my_map(lambda x: x * x, [1, 2, 3, 4, 5])

Computing the  0 th result...
Computing the  1 th result...
Computing the  2 th result...
Computing the  3 th result...
Computing the  4 th result...


[1, 4, 9, 16, 25]

# multithread map

In [None]:
from threading import Thread

def schedule_computation_threaded(f, result, data, threads, i):    
    # Each function evaluation is scheduled on a different core.
    def my_job(): 
        print ("Processing data:", data[i], "... ")
        result[i] = f(data[i])
        print ("Finished job #", i)    
        print ("Result was", result[i])       
    threads[i] = Thread(target=my_job)
    
def my_map_multithreaded(f, data):
    n = len(data)
    result = [None] * n
    threads = [None] * n
    print ("Scheduling jobs.. ")
    for i in range(n):
        schedule_computation_threaded(f, result, data, threads, i)
    print ("Starting jobs.. ")
    for i in range(n):
        threads[i].start()
    print ("Waiting for jobs to finish.. ")
    for i in range(n):
        threads[i].join()
    print ("All done.")
    return result

In [None]:
my_map_multithreaded(lambda x: x*x, [1, 2, 3, 4, 5])

Scheduling jobs.. 
Starting jobs.. 
Processing data: 1 ... 
Finished job # 0
Result was 1
Processing data: 2 ... 
Finished job # 1
Result was 4
Processing data:Processing data:  3 ... 
Finished job # 2
Result was 9
4 ... 
Finished job # 3
Result was 16
Processing data:Waiting for jobs to finish.. 
 5 ... 
Finished job # 4
Result was 25
All done.


[1, 4, 9, 16, 25]

In [None]:
from numpy.random import uniform
from time import sleep

def a_function_which_takes_a_long_time(x):
    sleep(uniform(2, 10))  # Simulate some long computation
    return x*x

my_map_multithreaded(a_function_which_takes_a_long_time, [1, 2, 3, 4, 5])

Scheduling jobs.. 
Starting jobs.. 
Processing data: 1 ... 
Processing data: 2 ... 
Processing data: Processing data: 3 4 ... 
... 
Processing data:Waiting for jobs to finish.. 
 5 ... 
Finished job # 1
Result was 4
Finished job # 4
Result was 25
Finished job # 3
Result was 16
Finished job # 2
Result was 9
Finished job # 0
Result was 1
All done.


[1, 4, 9, 16, 25]

## Map Reduce

- Map Reduce is a _programming model_ for scalable parallel processing.
- Scalable here means that it can work on big data with very large compute clusters.
- There are many implementations: e.g. Apache Hadoop and Apache Spark.
- We can use Map-Reduce with any programming language:
    - Hadoop is written in Java
    - Spark is written in Scala, but has a Python interface.
- *Functional programming* languages such as Python or Scala fit very well with the Map Reduce model:
    - However, we don't *have* to use functional programming.

## Typical steps in a Map Reduce Computation

1. ETL a big data set.
2. _Map_ operation: extract something you care about from each row
3. "Shuffle and Sort": task/node allocation
4. _Reduce_ operation: aggregate, summarise, filter or transform
5. Write the results.

## Callbacks for Map Reduce

- The data set, and the state of each stage of the computation, is represented as a set of key-value pairs.

- The programmer provides a map function:

$\operatorname{map}(k, v) \rightarrow \; \left< k', v' \right>*$  

- and a reduce function:

$\operatorname{reduce}(k', \left< k', v'\right> *) \rightarrow \; \left< k', v''
\right> *$

- The $*$ refers to a *collection* of values.

- These collections are *not* ordered.


## Word Count Example

- In this simple example, the input is a set of URLs, each record is a document.

- Problem: compute how many times each word has occurred across data set.


## Word Count: Map 


- The input to $\operatorname{map}$ is a mapping:

- Key: URL
- Value: Contents of document


$\left< document1, to \; be \; or \; not \; to \; be \right>$  
    

- In this example, our $\operatorname{map}$ function will process a given URL, and produces a mapping:

- Key: word
- Value: 1

- So our original data-set will be transformed to:
  
  $\left< to, 1 \right>$
  $\left< be, 1 \right>$
  $\left< or, 1 \right>$
  $\left< not, 1 \right>$
  $\left< to, 1 \right>$
  $\left< be, 1 \right>$


  ## Word Count: Reduce


- The reduce operation groups values according to their key, and then performs areduce on each key.

- The collections are partitioned across different storage units, therefore.

- Map-Reduce will fold the data in such a way that it minimises data-copying across the cluster.

- Data in different partitions are reduced separately in parallel.

- The final result is a reduce of the reduced data in each partition.

- Therefore it is very important that our operator *is both commutative and associative*.

- In our case the function is the `+` operator

  $\left< be, 2 \right>$  
  $\left< not, 1 \right>$  
  $\left< or, 1 \right>$  
  $\left< to, 2 \right>$  


## Map and Reduce compared with Python

- Notice that these functions are formulated differently from the standard Python functions of the same name.

- The `reduce` function works with key-value *pairs*.

- It would be more apt to call it something like `reduceByKey`.


## MiniMapReduce

- To illustrate how the Map-Reduce programming model works, we can implement our own Map-Reduce framework in Python.

- This *illustrates* how a problem can be written in terms of `map` and `reduce` operations.

- Note that these are illustrative functions; this is *not* how Hadoop or Apache Spark actually implement them.


In [None]:
##########################################################
#
#   MiniMapReduce
#
# A non-parallel, non-scalable Map-Reduce implementation
##########################################################

def groupByKey(data):
    result = dict()
    for key, value in data:
        if key in result:
            result[key].append(value)
        else:
            result[key] = [value]
    return result
        
def reduceByKey(f, data):
    key_values = groupByKey(data)
    return map(lambda key: 
                   (key, reduce(f, key_values[key])), 
                       key_values)

In [None]:
data = map(lambda x: (x, 1), "to be or not to be".split())
data

<map at 0x7f5aca3fa790>

In [None]:
groupByKey(data)

{'be': [1, 1], 'not': [1], 'or': [1], 'to': [1, 1]}

In [None]:
reduceByKey(lambda x, y: x + y, data)

<map at 0x7f5aca363c50>

## Parallelising MiniMapReduce

- We can easily turn our Map-Reduce implementation into a parallel, multi-threaded framework
by using the `my_map_multithreaded` function we defined earlier.

- This will allow us to perform map-reduce computations that exploit parallel processing using *multiple* cores on a *single* computer.

In [None]:
def reduceByKey_multithreaded(f, data):
    key_values = groupByKey(data)
    return my_map_multithreaded(
        lambda key: (key, reduce(f, key_values[key])), key_values.keys())

In [None]:
reduceByKey_multithreaded(lambda x, y: x + y, data)

Scheduling jobs.. 
Starting jobs.. 
Waiting for jobs to finish.. 
All done.


[]

## Parallelising the reduce step

- Provided that our operator is both associative and commutative we can
also parallelise the reduce operation.

- We partition the data into approximately equal subsets.

- We then reduce each subset independently on a separate core.

- The results can be combined in a final reduce step.

## Partition data

In [None]:
def split_data(data, split_points):
    partitions = []
    n = 0
    for i in split_points:
        partitions.append(data[n:i])
        n = i
    partitions.append(data[n:])
    return partitions

data = ['a', 'b', 'c', 'd', 'e', 'f', 'g']
partitioned_data = split_data(data, [3])
partitioned_data

[['a', 'b', 'c'], ['d', 'e', 'f', 'g']]

In [None]:
from threading import Thread

def parallel_reduce(f, partitions):

    n = len(partitions)
    results = [None] * n
    threads = [None] * n
    
    def job(i):
        results[i] = reduce(f, partitions[i])

    for i in range(n):
        threads[i] = Thread(target = lambda: job(i))
        threads[i].start()
    
    for i in range(n):
        threads[i].join()
    
    return reduce(f, results)

parallel_reduce(lambda x, y: x + y, partitioned_data)

'abcdefg'

# Homework


First generate text file

In [None]:
!pip install lorem




In [None]:
from lorem import text
num = 10000
for i in range(4):
    with open("sample{0:02d}.txt".format(i), "w") as f:
        for j in range(num):
          f.write(text()+'\n')

In [None]:
text()

'Sit quiquia velit sit dolorem dolore ut ut. Sit quisquam velit sit dolore magnam numquam. Ut est dolorem dolor dolor dolor. Eius ipsum aliquam aliquam quaerat ut dolorem. Porro quaerat tempora ut adipisci etincidunt non.\n\nNon amet consectetur sed porro dolor. Aliquam voluptatem est etincidunt consectetur dolor dolor non. Dolore etincidunt dolorem neque. Adipisci sit amet magnam. Labore porro neque est modi porro. Consectetur dolore dolorem sit. Eius quisquam dolorem velit neque. Voluptatem quiquia aliquam dolore ipsum.\n\nSit sed aliquam dolorem amet sit adipisci. Labore porro ipsum modi est voluptatem tempora voluptatem. Consectetur quisquam labore dolorem modi quisquam est. Quisquam labore modi velit sed quiquia magnam consectetur. Voluptatem quiquia adipisci porro numquam. Sed porro dolore quiquia non voluptatem ut. Dolor magnam labore dolorem amet ut. Non porro etincidunt dolorem amet tempora. Velit dolorem adipisci dolorem dolor etincidunt eius.\n\nUt adipisci consectetur non q

In [None]:
import glob
files = sorted(glob.glob('sample0*.txt'))
files

['sample00.txt', 'sample01.txt', 'sample02.txt', 'sample03.txt']

In [None]:
!ls  -l

total 55644
-rw-r--r-- 1 root root 14139478 Jan 30 16:33 sample00.txt
-rw-r--r-- 1 root root 14271255 Jan 30 16:33 sample01.txt
-rw-r--r-- 1 root root 14299183 Jan 30 16:33 sample02.txt
-rw-r--r-- 1 root root 14252029 Jan 30 16:33 sample03.txt
drwxr-xr-x 1 root root     4096 Jan  7 14:33 sample_data


## Question: write iterative version to perform the wordcount over the above files

In [None]:
import time

In [None]:
#1) iterative code

data0 = open("sample00.txt", "r").read()
data1 = open("sample01.txt", "r").read()
data2 = open("sample02.txt", "r").read()
data3 = open("sample03.txt", "r").read()

raw_text = data0 + data1 + data2 + data3

for char in '-.,\n':
    raw_text = raw_text.replace(char,' ')
low_text = raw_text.lower()
wordcount={}

start = time.time()
for word in sorted(low_text.split()):
    if word not in wordcount:
        wordcount[word] = 1
    else:
        wordcount[word] += 1

end = time.time()
print(len(wordcount)) 
print(wordcount)
print('Time taken in seconds :', end - start) 

27
{'adipisci': 299631, 'aliquam': 299743, 'amet': 300276, 'consectetur': 300609, 'dolor': 300055, 'dolore': 299975, 'dolorem': 300635, 'eius': 300203, 'est': 300067, 'etincidunt': 299611, 'ipsum': 299932, 'labore': 299859, 'magnam': 298880, 'modi': 300175, 'neque': 299587, 'non': 299753, 'numquam': 300077, 'porro': 299082, 'quaerat': 299739, 'quiquia': 299799, 'quisquam': 300085, 'sed': 300029, 'sit': 299122, 'tempora': 299861, 'ut': 298310, 'velit': 299549, 'voluptatem': 299896}
Time taken in seconds : 6.700387477874756


## Write Map Reduce version to perform wordcount

In [None]:
#2) map reduce python

#create function to return the results in the form  ('word1',1)
words = low_text.split()
def tup(data):
  return (data, 1)

start = time.time() 
#call python map function to return [('word1',1),('word1',1), ('word2',1),... ]
#by looping for all above files

map_data = map(tup, words)
map_data

# take the results from the above map
# and write reduce to return [('word1',2),('word1',1) ....]
wordcount = reduceByKey(lambda x,y: x+y , sorted(map_data))
end = time.time() 
wordcount = list(wordcount)
print(len(wordcount))
print(wordcount)

print('Time taken in seconds :', end - start) 

27
[('adipisci', 299631), ('aliquam', 299743), ('amet', 300276), ('consectetur', 300609), ('dolor', 300055), ('dolore', 299975), ('dolorem', 300635), ('eius', 300203), ('est', 300067), ('etincidunt', 299611), ('ipsum', 299932), ('labore', 299859), ('magnam', 298880), ('modi', 300175), ('neque', 299587), ('non', 299753), ('numquam', 300077), ('porro', 299082), ('quaerat', 299739), ('quiquia', 299799), ('quisquam', 300085), ('sed', 300029), ('sit', 299122), ('tempora', 299861), ('ut', 298310), ('velit', 299549), ('voluptatem', 299896)]
Time taken in seconds : 10.319125890731812


## Write multithread map reduce

In [None]:
# 3) multithread map and reduce
# with  partition sizes:
# 3.1) one line per partition
# 3.2) four line per partition
# 3.3) one file per partition

raw_text = data0 + data1 + data2 + data3

for char in '-.,':
      raw_text = raw_text.replace(char,' ').lower()

raw_lines = [s for s in raw_text.splitlines() if s]
print(len(raw_lines))

def tup(data):
  return (data, 1)

# MAPPING
def schedule_computation_threaded_map(f, result, data, threads, i):
    # Each function evaluation is scheduled on a different core.
    def my_job():
        # print ("Processing data:", data[i], "... ")
        words = data[i].split()
        result[i] = list(map(f, words))
        # print ("Finished job #", i)    
        # print ("Result was", result[i])       
    threads[i] = Thread(target=my_job)

def my_map_multithreaded_map(f, data):
    n = len(data)
    result = [None] * n
    threads = [None] * n
    # print ("Scheduling jobs.. ")
    for i in range(n):
        schedule_computation_threaded_map(f, result, data, threads, i)
    # print ("Starting jobs.. ")
    for i in range(n):
        threads[i].start()
    # print ("Waiting for jobs to finish.. ")
    for i in range(n):
        threads[i].join()
    # print ("All done.")
    return result

# REDUCING
def schedule_computation_threaded_reduce(f, result, data, threads, i):    
    # Each function evaluation is scheduled on a different core.
    def my_job(): 
        # print ("Processing data:", data[i], "... ")
        key_list = list(data[i].keys())
        for key in range(len(key_list)):
          result.append(f(i, key_list[key]))
        # print ("Finished job #", i)
        # print ("Result was", result)
    threads[i] = Thread(target=my_job)

def my_map_multithreaded_reduce(f, data):
    n = len(data)
    result = []
    threads = [None] * n
    # print ("Scheduling jobs.. ")
    for i in range(n):
        schedule_computation_threaded_reduce(f, result, data, threads, i)
    # print ("Starting jobs.. ")
    for i in range(n):
        threads[i].start()
    # print ("Waiting for jobs to finish.. ")
    for i in range(n):
        threads[i].join()
    # print ("All done.")
    return result

def reduceByKey_multithreaded_line(f, data):
    keys_values_in_line = []
    for line in data:
        keys_values_in_line.append(groupByKey(line))
    return my_map_multithreaded_reduce(lambda i,key: (key, reduce(f, keys_values_in_line[i][key])), keys_values_in_line)

# using my_map multithread

# using multithread reduce

180010


In [None]:
# 3.1 ------------------------------------------------------------------------------------------------------------------------------------

# remove empty string from splited lines list

start1 = time.time() 
lines_tuple1 = my_map_multithreaded_map(tup, raw_lines)
lines_reduce_tuple1 = reduceByKey_multithreaded_line(lambda x, y: x + y, lines_tuple1)
sorted_tuple1 = sorted(lines_reduce_tuple1, key=lambda x: x[0])
wordcount1 = reduceByKey(lambda x,y: x + y, sorted_tuple1)
wordcount1 = list(wordcount1)
end1 = time.time() 

print(len(wordcount1))
print(wordcount1)
print('Time taken in seconds :', end1 - start1) 

# ----------------------------------------------------------------------------------------------------------------------------------------

27
[('adipisci', 299631), ('aliquam', 299743), ('amet', 300276), ('consectetur', 300609), ('dolor', 300055), ('dolore', 299975), ('dolorem', 300635), ('eius', 300203), ('est', 300067), ('etincidunt', 299611), ('ipsum', 299932), ('labore', 299859), ('magnam', 298880), ('modi', 300175), ('neque', 299587), ('non', 299753), ('numquam', 300077), ('porro', 299082), ('quaerat', 299739), ('quiquia', 299799), ('quisquam', 300085), ('sed', 300029), ('sit', 299122), ('tempora', 299861), ('ut', 298310), ('velit', 299549), ('voluptatem', 299896)]
Time taken in seconds : 57.40858030319214


In [None]:
# 3.2 ------------------------------------------------------------------------------------------------------------------------------------

raw_lines2 = []

mod = len(raw_lines) % 4

k = ''
for i in range(len(raw_lines)):
  k = k + raw_lines[i]
  if not ((i+1) % 4):
    raw_lines2.append(k)
    k = ''
  if not (len(raw_lines) - i - 1):
    raw_lines2.append(k)
    k = ''

print(len(raw_lines2))

start2 = time.time() 
lines_tuple2 = my_map_multithreaded_map(tup, raw_lines2)
lines_reduce_tuple2 = reduceByKey_multithreaded_line(lambda x, y: x + y, lines_tuple2)
sorted_tuple2 = sorted(lines_reduce_tuple2, key=lambda x: x[0])
wordcount2 = reduceByKey(lambda x,y: x + y, sorted_tuple2)
wordcount2 = list(wordcount2)
end2 = time.time() 

print(len(wordcount2))
print(wordcount2)
print('Time taken in seconds :', end2 - start2) 

# ----------------------------------------------------------------------------------------------------------------------------------------

45003
27
[('adipisci', 299631), ('aliquam', 299743), ('amet', 300276), ('consectetur', 300609), ('dolor', 300055), ('dolore', 299975), ('dolorem', 300635), ('eius', 300203), ('est', 300067), ('etincidunt', 299611), ('ipsum', 299932), ('labore', 299859), ('magnam', 298880), ('modi', 300175), ('neque', 299587), ('non', 299753), ('numquam', 300077), ('porro', 299082), ('quaerat', 299739), ('quiquia', 299799), ('quisquam', 300085), ('sed', 300029), ('sit', 299122), ('tempora', 299861), ('ut', 298310), ('velit', 299549), ('voluptatem', 299896)]
Time taken in seconds : 30.906075954437256


In [None]:
# 3.2 ------------------------------------------------------------------------------------------------------------------------------------

raw_lines3 = []
for char in '-.,':
      data0 = data0.replace(char,' ').lower()
      data1 = data1.replace(char,' ').lower()
      data2 = data2.replace(char,' ').lower()
      data3 = data3.replace(char,' ').lower()

raw_lines3.append(data0)
raw_lines3.append(data1)
raw_lines3.append(data2)
raw_lines3.append(data3)
print(len(raw_lines3))

start3 = time.time() 
lines_tuple3 = my_map_multithreaded_map(tup, raw_lines3)
lines_reduce_tuple3 = reduceByKey_multithreaded_line(lambda x, y: x + y, lines_tuple3)
sorted_tuple3 = sorted(lines_reduce_tuple3, key=lambda x: x[0])
wordcount3 = reduceByKey(lambda x,y: x + y, sorted_tuple3)
wordcount3 = list(wordcount3)
end3 = time.time() 

print(len(wordcount3))
print(wordcount3)
print('Time taken in seconds :', end3 - start3) 

# ----------------------------------------------------------------------------------------------------------------------------------------

4
27
[('adipisci', 299631), ('aliquam', 299743), ('amet', 300276), ('consectetur', 300609), ('dolor', 300055), ('dolore', 299975), ('dolorem', 300635), ('eius', 300203), ('est', 300067), ('etincidunt', 299611), ('ipsum', 299932), ('labore', 299859), ('magnam', 298880), ('modi', 300175), ('neque', 299587), ('non', 299753), ('numquam', 300077), ('porro', 299082), ('quaerat', 299739), ('quiquia', 299799), ('quisquam', 300085), ('sed', 300029), ('sit', 299122), ('tempora', 299861), ('ut', 298310), ('velit', 299549), ('voluptatem', 299896)]
Time taken in seconds : 5.0938401222229


Report the time
for 1),2),3) (3.1,3.2,3.3)

Which one is the fastest?



iterative code : 6.700387477874756 (s)

map reduce python : 10.319125890731812 (s)

3) multithread map and reduce

with  partition sizes:

3.1) one line per partition : 57.40858030319214 (s)

3.2) four line per partition : 30.906075954437256 (s)

3.3) one file per partition : 5.0938401222229 (s)

3.3 is fastest เพราะทำ multithread เพียงแค่ 4 ตัว และ overhead น้อยมาก
