# Example 1: Working with numbers

In [1]:
import random

In [2]:
def create_random_integer_array(n=20000, max_n=100):
    random_array = []
    # The function range(a, b) produces an array [a, a+1, a+2, ..., b-2, b-1]
    for r in range(0, n): # Iterating between 0 and n.
        random_number = random.randint(0, max_n) # Produces a random int between 0 and max_n
        random_array.append(random_number)
    return random_array

In [7]:
# We are calling the function with the default parameters (20k and 100), but feel free to change that.
random_array = create_random_integer_array() 

In [8]:
# To calculate the average, we need to count the elements and sum them. Both of these operations are very fast
# and optimised in Python, so even if it is a large array, it will still be faster than a parallel solution,
# but let's design what this parallel solution should look like if the number of elements couldn't fit in a 
# single computer and we needed a true distributed solution instead of a fake parallel one like in this example

# A non-parallel solution is just:
average = sum(random_array) / len(random_array)
print('The total average is:',average)

The total average is: 50.3681


In [9]:
# To test a "fake parallel" approach, we can divide the array in 5 parts for example, as in the code below 
# Feel free to change n_parts, the final solution should be the same, only that the calculation more parallel.
n_parts = 5
part_size = int(len(random_array) / n_parts)

parts = []  # An array of arrays, containing the n_parts
for p in range(n_parts):
    part = random_array[part_size*p: part_size*(p+1)]
    parts.append(part)
    
    


# Then apply a MapReduce approach. For that we define two functions, the first one for the map stage, 
# and the second one for the reduce stage

def get_count_and_sum(array):
    '''
    Function that returns a dictionary with two values: the count and the sum of the array given
    as an input parameter
    '''
    return {
        'count': len(array),
        'sum': sum(array)
    }

def reduce_sum_counts(sum_count_arrays):
    '''
    Function that takes the dictionaries calculated before in the map stage, and aggregates them
    summing all of the 'count' fields and all of the 'sum' fields, and returning these two values
    '''
    total_count = 0
    total_sum = 0
    for s in sum_counts:
        total_count += s['count']
        total_sum += s['sum']
    return total_count, total_sum

# MAP PART
# We call the Python's native map function which accepts two parameters: a function (our map function) and 
# an array. This map native function applies the map function parameter to each element in the 2nd parameter (the array) 
# In short: For each of the array parts, we calculate their sum and length.
sum_counts = map(get_count_and_sum, parts)
# We cast the map object (which is a generator) as a list. This is the same as iterating until the generator is over
# and add each element to a list - casting it is simpler.
sum_counts = list(sum_counts)  
# We print out the partial results of the map stage
for s in sum_counts:
    print('Part has:',s['count'], 'elements, summing',s['sum'])

# REDUCE PART
count, summ = reduce_sum_counts(sum_counts)
# After we have aggregated everything, we just have the final two parameters (the total sum and length of the array)
# so the average is just sum over count:
average = summ/count

print('The total average is:',average)

Part has: 4000 elements, summing 203658
Part has: 4000 elements, summing 199859
Part has: 4000 elements, summing 200720
Part has: 4000 elements, summing 201541
Part has: 4000 elements, summing 201584
The total average is: 50.3681


In [None]:
list(sum_counts)

# Example 2: Counting a word in Wikipedia articles

In [10]:
import concurrent.futures
import requests
import time

def count_term_in_wikipedia_article(term, wiki_article, timeout=10):
    '''
    Method that counts how many times the term "term" appears in the 
    Wikipedia page "wiki_article". 
    It also prints in the screen the result before returning it. 
    '''
    wiki_page_url = 'https://en.wikipedia.org/wiki/'+wiki_article
    response = requests.get(url=wiki_page_url, timeout=timeout)
    article = response.text
    count = article.count(term)
    print('"', term, '" appears', count, 'times in the article',wiki_article,'\n')
    return count

In [11]:
term = 'data'
wiki_articles = [
    'big_data', 
    'data_analysis', 
    'data_modelling', 
    'relational_algebra', 
    'software_engineering',
    'machine_learning',
    'artificial_intelligence',
    'support_vector_machine',
    'random_forest',
    'logistic_regression',
    'naive_bayes_classifier',
    'bayesian_probability',
    'database',
    'data',
    'information',
    'entity_relationship',
    'information_system',
    'information_technology',
    'apache_hadoop',
    'apache_spark',
    'array_data_structure',
    'dictionary_data_structure',
]

# Performing tasks one by one

In [12]:
start = time.perf_counter()
total_appearances = 0
for article in wiki_articles:
    total_appearances += count_term_in_wikipedia_article(term, article)
stop = time.perf_counter()
print('The process took', stop-start, 'seconds')
print('Total appearances of the term', term, ':', total_appearances)

" data " appears 1284 times in the article big_data 

" data " appears 553 times in the article data_analysis 

" data " appears 266 times in the article data_modelling 

" data " appears 197 times in the article relational_algebra 

" data " appears 174 times in the article software_engineering 

" data " appears 371 times in the article machine_learning 

" data " appears 536 times in the article artificial_intelligence 

" data " appears 146 times in the article support_vector_machine 

" data " appears 104 times in the article random_forest 

" data " appears 205 times in the article logistic_regression 

" data " appears 66 times in the article naive_bayes_classifier 

" data " appears 98 times in the article bayesian_probability 

" data " appears 930 times in the article database 

" data " appears 241 times in the article data 

" data " appears 153 times in the article information 

" data " appears 251 times in the article entity_relationship 

" data " appears 159 times in t

# Performing tasks concurrently imitating a MapReduce process
Only that it is a parallel approach, not a distributed one yet... So the MapReduce process is actually running over different threads in the machine, not distributed. 

What happens if you change the number of MAX_WORKERS?

If you manage to find it, why do you think that is the optimal number? Why doesn't it always works best the more workers we have?

In [33]:
MAX_WORKERS = 15

start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    futures = []
    
    # MAP STAGE - That "submit" function is basically a map through all of the parallel processes 
    for article in wiki_articles:
        futures.append(executor.submit(count_term_in_wikipedia_article, 
                                       term=term, 
                                       wiki_article=article)
                      )
    
    # SHUFFLE AND SORT STAGE
    appearances = []
    for future in concurrent.futures.as_completed(futures):
        appearances.append(future.result())
        
    # REDUCE STAGE
    total_appearances_concurrent = sum(appearances)

stop = time.perf_counter()
print('The process took', stop-start, 'seconds')
print('Total appearances of the term', term, ':', total_appearances_concurrent)
    

" data " appears 66 times in the article naive_bayes_classifier 

" data " appears 553 times in the article data_analysis 

" data " appears 266 times in the article data_modelling "" 
 data
data  " appears " " data "" appears data " appears 197 " data times in the articledata " appears 1284  " appears 153 times in the article information 

times in the article big_data 

" appears104 relational_algebra241 times in the article data 

 98 times in the article bayesian_probability 

  "

 times in the article datarandom_forest  " appears 174 times in the article software_engineering 



" data" " appears  146 times in the article support_vector_machine 

data " appears 930 times in the article database 

" data " appears 371 times in the article machine_learning 

" data " appears 205 times in the article logistic_regression 

" data " appears 536 times in the article artificial_intelligence 

" data " appears 251 times in the article entity_relationship 

" data " appears 89 times in th