# FIT5148 - Distributed Databases and Big Data

# Take Home Test - Solution Workbook#

This test consists of three questions total worth 5% of the final marks. The first question is related to ** Parallel Search Algorithms (1 Marks)**, the second question is related to ** Parallel Join Algorithms (2 Marks)** and the third question is realted to ** Parallel Sort and GroupBy Algorithms (2 Marks)**.

**Instructions:**
- You will be using Python 3.
- Read the instructions, code base and comments carefully.
- There are code blocks that **you need to complete** yourself as a part of test.
- <font color='red'> **Comment each line of code properly such that the tutor can easily understand what you are trying to do in the code.**</font>

**Your Details:**
- Name: Shih-Ting Chu
- StudentID: 29286875
- Email: schu0020@student.monash.edu


Let's get started!



### Dataset ###
For this test, we will use the following two tables R and S to write the solutions to three parallel algorithms.

In [1]:
# R consists of 15 pairs, each comprising two attributes (nominal and numeric)
R = [('Adele',8),('Bob',22),('Clement',16),('Dave',23),('Ed',11),
     ('Fung',25),('Goel',3),('Harry',17),('Irene',14),('Joanna',2),
     ('Kelly',6),('Lim',20),('Meng',1),('Noor',5),('Omar',19)]

# S consists of 8 pairs, each comprising two attributes (nominal and numeric)
S = [('Arts',8),('Business',15),('CompSc',2),('Dance',12),('Engineering',7),
     ('Finance',21),('Geology',10),('Health',11),('IT',18)]


### 1. Parallel Searching Algorithm ###
In this task, you will build a **parallel search algorithm for range selection (continuous)** for a given query. You will implement one particular search algorithm which is instructed below.

 **Implement a parallel search algorithm** that uses the linear search algorithm (i.e. **`linear_search()`**) and is able to work with the hash partitioning method (i.e.**` h_partition()`**). 
 **Complete the code block between "### START CODE HERE ###" and "### END CODE HERE ###".**

In [2]:
# linear search function
def linear_search(data, key):
    """
    Perform linear search on data for the given key

    Arguments:
    data -- an input dataset which is a list or a numpy array
    key -- an query record

    Return:
    result -- the position of searched record
    """
    
    matched_record = None # initialise as None
    
    ### START CODE HERE ### 
    
    # value is ('Adele',8); value[0] is Adele; value[1] is 8 #
    for value in data: # check each value in data
        if value[1] == key: # if value[1] is matched with key
            matched_record = value # now the matched record is that value[1]
            break # break the loop
    
    ### END CODE HERE ###
    
    return matched_record # return the matched record

In [3]:
# define a simple hash function.
def s_hash(input_record, p):
    """
    Define a simple hash function for demonstration

    Arguments:
    input_record -- an input record
    p -- the number of processors

    Return:
    result -- the hash value of input_record
    """
    result = input_record%p # remainder of input_record/p
 
    return result # return the result

In [4]:
# hash data partitionining function
# use the "s_hash" function defined above to realise this partitioning
def h_partition(data, p):
    """
    Perform hash data partitioning on data

    Arguments:
    data -- an input dataset which is a list
    p -- the number of processors

    Return:
    result -- the paritioned subsets of D
    """
    
    ### START CODE HERE ###
    
    partitions = {} # use a dictionary
    for input_record in data: # each record in data, perform the following
        h = s_hash(input_record[1], p) # get the hash key of the input
        if (h in partitions.keys()): # if the key exists
            s = partitions[h] # s indicates a set
            s.add(input_record) # add input_record into set s
            partitions[h] = s # add the new input to the value set of the key
        else: # if the key does not exist
            s = set() # create an empty value set
            s.update({input_record}) # update() adds elements from a set to the set
            partitions[h] = s # add the value set to the key
            
    ### END CODE HERE ###
    
    return partitions # return the partitions

In [5]:
from multiprocessing import Pool

# parallel searching algorithm for range selection
def parallel_search_range(data, query_range, n_processor):
    """
    Perform parallel search for range selection on data for the given key

    Arguments:
    data -- the input dataset which is a list
    query_range -- a query record in the form of a range (e.g. [30, 50])
    n_processor -- the number of parallel processors
    
    Return:
    results -- the matched record information
    """
    
    results = []

    pool = Pool(processes=n_processor)

    ### START CODE HERE ###     
    
    # perform data partitioning first
    DD = h_partition(data, n_processor)
 

    # 要想成分割好處理器後，從資料集中一個個資料去比對（而非從5開始去返回比對處理器中的資料）
#     for partition in DD:
#         result = pool.apply_async(linear_search, [data, query_range])
    
#     pool.close()
#     pool.join()
    
#     results = result.get()
    
    
    # get the ID from data
    query_list = []
    for each in data:
        query_list.append(each[1])
    
    # each element in DD has a pair (hash key: records)
    # from the first query_range to the last query_range, step = 1 (check 1 by 1)
    for query in range(query_range[0], query_range[1], 1):
        query_hash = s_hash(query, n_processor) # check which processor it should be in
        d = list(DD[query_hash]) # list all values in the processor which has the same remainder as query_hash
        result = pool.apply(linear_search, [d, query]) # apply linear_search
        if query in query_list:
            results.append(result)
            
    ### END CODE HERE ###
    
    return results # return the result

In [6]:
n_processor = 3
# range partition, linear_search 
results = parallel_search_range(R, [5, 20], n_processor)
print(results) 

[('Noor', 5), ('Kelly', 6), ('Adele', 8), ('Ed', 11), ('Irene', 14), ('Clement', 16), ('Harry', 17), ('Omar', 19)]


## 2. Parallel Join Algorithm

In this task, you will implement a **disjoint-partitioning based parallel join algorithm**. This algorithm consist of two stages: a data partitioning stage using a disjoint partitioning and a local join.

 
As a data partitioning method, use the range partitioninig method  (i.e. **`range_partition( )`**).
Assume that we have **3 parallel processors**, processor 1 will get records with join attribute value between 1 and 9, processor 2 between 10 and 19, and processor 3 between 20 and 29. Note that both tables R and S need to be partitioned based on the join attribute with the same range partitioning function. 

As a joining technique, use the hash based join algorithm (i.e.**`HB_join( )`** ).  **Complete the code block between "### START CODE HERE ###" and "### END CODE HERE ###".**

In [7]:
import operator

# range data partitionining function
def range_partition(data, range_indices):
    """
    Perform range data partitioning on data based on the join attribute

    Arguments:
    data -- an input dataset which is a list
    range_indices -- the index list of ranges to be s:plit

    Return:
    result -- the paritioned subsets of D
    """
    result = []
    
    ### START CODE HERE ###  
    
    # sort the dataset according their values first
    new_data = list(data)
    new_data.sort(key = operator.itemgetter(1))
#     new_data.sort(key = lambda new_data:new_data[1]) ### another way to sort ###


    # calculate the number of bins
    n_bin = len(range_indices)

    # for each bin, perform the following
    for i in range(n_bin): 
        # find elements to be belonging to each range
        s = [x for x in new_data if x[1] < range_indices[i]] # s refers a list to store elements in the specific range
        # add the partitioned list to the result
        result.append(s) 
        # find the last element in the previous partition
        last_element = s[len(s)-1]
        # find the index of the last element
        last = new_data.index(last_element)
        # remove the partitioned list from the dataset
        new_data = new_data[int(last)+1:] # a list of the remaining elements

        # append the last remaining data list
    result.append([x for x in new_data if x[1] >= range_indices[n_bin-1]]) 
    
    ### END CODE HERE ###
    
    return result # return the result

In [8]:
def H(r):
    """
    We define a hash function 'H' that is used in the hashing process works 
    by summing the first and second digits of the hashed attribute, which
    in this case is the join attribute. 
    
    Arguments:
    r -- a record where hashing will be applied on its join attribute

    Return:
    result -- the hash index of the record r
    """
    
    # convert the value of the join attribute into the digits
    digits = [int(d) for d in str(r[1])]
    
    # calulate the sum of elemenets in the digits
    return sum(digits)

In [9]:
def HB_join(T1, T2):
    """
    Perform the hash-based join algorithm.
    The join attribute is the numeric attribute in the input tables T1 & T2

    Arguments:
    T1 & T2 -- Tables to be joined

    Return:
    result -- the joined table
    """
    
    result = []
    
    
    dic = {} # use a dictionary
    
    # for each record in table T2
    for s in T2:
        # hash the record based on join attribute value using hash function H into hash table
        s_key = H(s)
        if s_key in dic:
            dic[s_key].add(s) # if there is an entry
        else:
            dic[s_key] = {s}
            
    # for each record in table T1 (probing)
    for r in T1:
        # hash the record based on join attribute value using H
        r_key = H(r)

        # if an index entry is found, then do the following
        if r_key in dic:
            # compare each record on this index entry with the record of table T1
            for item in dic[r_key]:
                if item[1] == r[1]:
                    # append the result
                    result.append({", ".join([r[0], str(r[1]), item[0]])})
    
    return result # return the result

In [10]:
# include this package for parallel processing
import multiprocessing as mp

def DPBP_join(T1, T2, n_processor):
    """
    Perform a disjoint partitioning-based parallel join algorithm.
    The join attribute is the numeric attribute in the input tables T1 & T2

    Arguments:
    T1 & T2 -- Tables to be joined
    n_processor -- the number of parallel processors

    Return:
    result -- the joined table
    """
    
    results = []
    
    ### START CODE HERE ### 
    
    # partition T1 & T2 into sub-tables using range_partition()
    # the number of the sub-tables must be the equal to the n_processor
    T1_subsets = range_partition(T1, [10, 20])
    T2_subsets = range_partition(T2, [10, 20])
    
    # pool: a Python method enabling parallel processing 
    pool = mp.Pool(processes = n_processor)
    
    midResults = []
    for i in range(len(T1_subsets)):
        # apply a join on each processor
        output = pool.apply_async(HB_join, [T1_subsets[i], T2_subsets[i]])
       
        midResults.append(output)
        
        
    for result in midResults:
        results.append(result.get())
        
        #results.append(pool.apply(HB_join, [T1_subsets[i], T2_subsets[i]]))
        
    ### END CODE HERE ###
    
    return results

In [11]:
n_processor = 3
DPBP_join(R, S, n_processor)

[[{'Joanna, 2, CompSc'}, {'Adele, 8, Arts'}], [{'Ed, 11, Health'}], []]

## 3. Parallel Sorting Algorithm

In this task, you will implement **parallel binary-merge sort** method. It has two phases same as the parallel merge-all sort that you learnt in the labs: (1) local sort and (2) final merge. The first phase is similar to the parallel merge-all sort. The second phase, the merging phase, is pipelined instead of concentrating on one processor. In this phase, we take the results from two processors and then merging the two in one processor, called binary merging. The result of the merging between two processors is passed on to the next level until one processor (the host) is left.

 **Complete the code block between "### START CODE HERE ###" and "### END CODE HERE ###".**
Assume that we use the round robin partitioning method  (i.e. **`rr_partition()`**). 

In [12]:
def qsort(data): 

    """ 
    Quicksort a list
    
    Arguments:
    arr -- the input list to be sorted

    Return:
    result -- the sorted arr
    """
    if len(data) <= 1: # if there is only 1 or less than 1 data record
        return data # return the data
    else:
        pivot = data[0][1] # take the value of first element as the pivot
        left_data = [x for x in data[1:] if x[1] < pivot] # compare the element's value to pivot's
        right_data = [x for x in data[1:] if x[1] >= pivot] # compare the element's value to pivot's
        value_list = qsort(left_data) + [data[0]] + qsort(right_data) # combine the lists to 1 list
        
        return value_list # return a list sorted by the value

In [13]:
# round-robin data partitionining function
def rr_partition(data, n):
    """
    Perform data partitioning on data

    Arguments:
    data -- an input dataset which is a list
    n -- the number of processors

    Return:
    result -- the paritioned subsets of D
    """
    result = []
    for i in range(n):
        result.append([])
    
    ### START CODE HERE ### 
    
    # calculate the number of the elements to be allocated to each bin
    n_bin = len(data)/n
    
    # for each bin, perform the following
    for index, element in enumerate(data): 
        # calculate the index of the bin that the current data point will be assigned
        index_bin = (int) (index % n)
        result[index_bin].append(element)
        
    ### END CODE HERE ###
    
    return result # return the result

In [14]:
import sys

# find the smallest record
def find_min(records):    
    """ 
    Find the smallest record
    
    Arguments:
    records -- the input record set

    Return:
    result -- the smallest record's index
    """
    m = records[0] # the first record data
    index = 0 # the first index
    for i in range(len(records)): # for each record
        if(records[i][1] < m[1]):  # if the current record's value < m's
            index = i # point to that index of this record
            m = records[i] # now m changed
    return index # return the index of that smallest record


def k_way_merge(record_sets):
    """ 
    K-way merging algorithm
    
    Arguments:
    record_sets -- the set of mulitple sorted sub-record sets

    Return:
    result -- the sorted and merged record set
    """
    
    # indexes will keep the indexes of sorted records in the given buffers
    indexes = []
    for x in record_sets:
        indexes.append(0) # initialisation with 0

    # final result will be stored in this variable
    result = []  
    
    while(True):
        merged_result = [] # the merging unit
        
        # this loop gets the current position of every buffer
        for i in range(len(record_sets)):
            if(indexes[i] >= len(record_sets[i])): # check if go through all values in this subfile
                merged_result.append(('sys max', sys.maxsize)) # if yes, add a largest positive integer to merged_result
            else:
                merged_result.append(record_sets[i][indexes[i]]) # if not finished yet, add the record
        
        # find the smallest record 
        smallest = find_min(merged_result) # smallest refers to the index of that record
    
        # if we only have sys.maxsize on the tuple, we reached the end of every record set
        if(merged_result[smallest][1] == sys.maxsize):
            break # break the loop

        # this record is the next on the merged list
        result.append(record_sets[smallest][indexes[smallest]])
        indexes[smallest] +=1
   
    return result

In [15]:
# Test k-way merging method
buffers = [[('Adele',1),('Bob',6),('Clement',7)], [('Fung',2),('Goel',5),('Harry',8)], [('Ed', 3), ('W', 4), ('RR', 9)]]
# print(buffers[0][1][1])
result = k_way_merge(buffers)
print(result)

[('Adele', 1), ('Fung', 2), ('Ed', 3), ('W', 4), ('Goel', 5), ('Bob', 6), ('Clement', 7), ('Harry', 8), ('RR', 9)]


In [16]:
def serial_sorting(dataset, buffer_size):
    """
    Perform a serial external sorting method based on sort-merge
    The buffer size determines the size of eac sub-record set

    Arguments:
    dataset -- the entire record set to be sorted
    buffer_size -- the buffer size determining the size of each sub-record set

    Return:
    result -- the sorted record set
    """
    
    if (buffer_size <= 2):
        print("Error: buffer size should be greater than 2")
        return
    
    result = []

    ### START CODE HERE ### 
    
    # --- Sort Phase ---
    sorted_set = []
    
    # read buffer_size pages at a time into memory and
    # sort them, and write out a sub-record set (i.e. variable: subset)
    start_pos = 0
    N = len(dataset)
    while True:
        if ((N - start_pos) > buffer_size):
            # read B-records from the input, where B = buffer_size
            subset = dataset[start_pos:start_pos + buffer_size] 
            # sort the subset (using qucksort defined above) and add into sorted_set
            sorted_subset = qsort(subset) 
            sorted_set.append(sorted_subset)
            start_pos += buffer_size
        else:
            # read the last B-records from the input, where B is less than buffer_size
            subset = dataset[start_pos:] 
            # sort the subset (using qucksort defined above)
            sorted_subset = qsort(subset) 
            sorted_set.append(sorted_subset)
            break
    
    # --- Merge Phase ---
    merge_buffer_size = buffer_size - 1 # use B-1 buffers for input and 1 buffer for output
    dataset = sorted_set
    while True:
        merged_set = []

        N = len(dataset)
        start_pos = 0
        while True:
            if ((N - start_pos) > merge_buffer_size): 
                # read C-record sets from the merged record sets, where C = merge_buffer_size
                subset = dataset[start_pos:start_pos + merge_buffer_size]
                merged_set.append(k_way_merge(subset)) # merge lists in subset
                start_pos += merge_buffer_size
            else:
                # read C-record sets from the merged sets, where C is less than merge_buffer_size
                subset = dataset[start_pos:]
                merged_set.append(k_way_merge(subset)) # merge lists in subset
                break

        dataset = merged_set
        if (len(dataset) <= 1): # if the size of merged record set is 1, then stop 
            result = merged_set
            break # break the loop
            
    ### END CODE HERE ###
    
    return result

In [20]:
# Include this package for parallel processing
import multiprocessing as mp

def parallel_binary_merge_sorting(dataset, n_processor, buffer_size):
    """
    Perform a parallel binary-merge sorting method

    Arguments:
    dataset -- entire record set to be sorted
    n_processor -- number of parallel processors
    buffer_size -- buffer size determining the size of each sub-record set

    Return:
    result -- the merged record set
    """
    
    if (buffer_size <= 2):
        print("Error: buffer size should be greater than 2")
        return
    
    result = []

    ### START CODE HERE ### 
    # Pre-requisite: Perform data partitioning using round-robin partitioning
    subsets = rr_partition(dataset, n_processor)
    
    # Pool: a Python method enabling parallel processing. 
    pool = mp.Pool(processes = n_processor)

    # ----- Sort phase -----
    sorted_set = []
    for s in subsets:
        # call the serial_sorting method above
        sorted_set.append(*pool.apply_async(serial_sorting, [s, buffer_size]).get())
    pool.close()
    
    # ---- Final merge phase ----
    print("Sorted entire set:\n" + str(sorted_set))
    dataset = sorted_set
    while True:
        merged_set = []

        N = len(dataset)
        start_pos = 0
        pool = mp.Pool(processes = N//2)

        while True:
            if ((N - start_pos) > 2): 
                subset = dataset[start_pos:start_pos + 2]
                merged_set.append(pool.apply(k_way_merge, [subset]))
                start_pos += 2
            else:
                subset = dataset[start_pos:]
                merged_set.append(pool.apply(k_way_merge, [subset]))
                break
        
        pool.close()
        dataset = merged_set
        
        if (len(dataset) == 1): # if the size of merged record set is 1, then stop 
            result = merged_set
            break
    ### END CODE HERE ###
    
    return result

In [21]:
# # test round robin partition
# subset = rr_partition(R, 10)
# subset

In [28]:
result = parallel_binary_merge_sorting(R, 10, 20)
print("\n" + "-"*90 + "\n")
print("Final Result:\n" + str(result))

Sorted entire set:
[[('Kelly', 6), ('Adele', 8)], [('Lim', 20), ('Bob', 22)], [('Meng', 1), ('Clement', 16)], [('Noor', 5), ('Dave', 23)], [('Ed', 11), ('Omar', 19)], [('Fung', 25)], [('Goel', 3)], [('Harry', 17)], [('Irene', 14)], [('Joanna', 2)]]

------------------------------------------------------------------------------------------

Final Result:
[[('Meng', 1), ('Joanna', 2), ('Goel', 3), ('Noor', 5), ('Kelly', 6), ('Adele', 8), ('Ed', 11), ('Irene', 14), ('Clement', 16), ('Harry', 17), ('Omar', 19), ('Lim', 20), ('Bob', 22), ('Dave', 23), ('Fung', 25)]]
