## DataSet

In [1]:
R = [('Adele',8),('Bob',22),('Clement',16),('Dave',23),('Ed',11),
     ('Fung',25),('Goel',3),('Harry',17),('Irene',14),('Joanna',2),
     ('Kelly',6),('Lim',20),('Meng',1),('Noor',5),('Omar',19)]

In [2]:
S = [('Arts',8),('Business',15),('CompSc',2),('Dance',12),
     ('Engineering',7 ), ('Finance',21),('Geology',10),
     ('Health',11),('IT',18)]

# 1. Serial Join Algorithms
## 1.1 Nested-loop join

Perform the _**nested-loop**_ join algorithms.  
It consists of two levels of loops: 
* inner loop (looping for the second table) 
* outer loop (looping for the ﬁrst table).

**If there are _N_ records in table T1 and _M_ records in table T2**.  
### Time Complexity: 
* _**O**( N * M )_

In [3]:
def NL_join(T1, T2):
    result = []
    for tr1 in T1:
        for tr2 in T2:
            if tr1[1] == tr2[1]:
                result.append({", ".join([tr1[0],str(tr1[1]),
                                          tr2[0]])})
    return result

In [4]:
NL_join(R,S)

[{'Adele, 8, Arts'}, {'Ed, 11, Health'}, {'Joanna, 2, CompSc'}]

## 1. 2 Sort-merge join

**Sort-Merge join** is based on sorting and merging operations  
basically just do the sorting before merging two tables.

### Time Complexity:
 * **Sorting**: _**O**( N logN )_ and _**O**( M logM )_
 * **Merging**: _**O**( N + M )_ _(which is linear)_ 

Better than Nested-loop join ( _**O**( N * M )_ )  
Especially if N and M are **very large**.

In [5]:
from random import *
pattern = [str(x) for x in range(10)]
a = [randrange(30) for x in range(10)]
sample = []
for p, a in zip(pattern, a):
    sample.append([p,a])
print(sample)
result = sorted(sample, key = lambda each:each[1])
# Sorted function: 
# Arguments list:
# 1. iterable object
# 2. key = a function to customize the sort order
# 3. reverse flag = True for descending order.(By default is False)
print(result)

[['0', 23], ['1', 6], ['2', 9], ['3', 15], ['4', 27], ['5', 20], ['6', 13], ['7', 14], ['8', 11], ['9', 8]]
[['1', 6], ['9', 8], ['2', 9], ['8', 11], ['6', 13], ['7', 14], ['3', 15], ['5', 20], ['0', 23], ['4', 27]]


In [6]:
def SM_join(T1, T2):
    result = []
    # convert to list of lists
    s_T1 = list(T1)
    # sort T1 based on the join attribute    
    s_T1 = sorted(s_T1, key=lambda each: each[1])
    s_T2 = list(T2) 
    s_T2 = sorted(s_T2, key=lambda each: each[1])
    i = 0
    j = 0
    while i < len(T1) and j < len(T2):
        r = s_T1[i][1] 
        s = s_T2[j][1]
        if r == s:
            result.append([s_T1[i][0], r, s_T2[j][0]])
            i+=1
            j+=1
        elif r < s:
            i += 1
        else:
            j += 1
    return result

SM_join(R, S)

[['Joanna', 2, 'CompSc'], ['Adele', 8, 'Arts'], ['Ed', 11, 'Health']]

## 1.3 Hash_Based join
Made up of two processes: **Hasing** and **Probing**  
* **Hashing**: hash all records of the ﬁrst table using a hash function   
* **Probing**: 
   * Hash Again: hash all records of the second table record by record.
   * Search & Match: if hashing to a nonempty index entry then examine each record in that entry.
   
### Time Complexity:
_**O**( N + M )_

In [7]:
def H(r):
    digits = [int(d) for d in str(r[1])]
    #convert input into string then back to integer
    return sum(digits) #Sum up

In [8]:
def HB_join(T1,T2):
    result = []
    dic = {}
    for s in T2:
        # First, do hashing to the smaller table
        s_key = H(s)
        # Group elements with the same hash value
        if s_key in dic:
            dic[s_key].add(s)
        else:
            dic[s_key] = {s}
            # The reason of using set data type is to handle 
            # collison problem
            # (multiple data with the same hash table index)
    #Then to probe
    for r in T1:
        r_key = H(r)
        if r_key in dic:
            # using 'in' will directly check on the dict's keys
            # Then, load dataset with the same hash value
            dataset = dic[r_key]
            for data in dataset:
                if data[1] == r[1]:
                    result.append({", ".join([r[0],str(r[1]),
                                             data[0]])})
    return result

In [9]:
HB_join(R,S)

[{'Adele, 8, Arts'}, {'Ed, 11, Health'}, {'Joanna, 2, CompSc'}]

## Comparison:
The complexity of join algorithms is normally dependent on the number of times that a **disk scan** (_**the most expensive operation**_) needs to be performed.  
> **Minimizing disk scan** is the ultimate objective not only in join algorithms, but also in any query processing algorithms.

* **Nested-loop join** algorithm:
> _**O(** N \* M **)**_ **  
> ==>** Exponential
* **Sort-merge join** algorithm:
> _**O(** N logN + M logM + N + M **)**_  
> **==>** better than exponential but not good as linear
* **Hash-based join** algorithm:  
> _**O(** N + M **)**_  
> **==>** Linear (The **Most Efficient** join algorithm)  

# 2. Parallel Join Algorithms
### * The join operation is one of the most expensive operations in relational query processing, and hence the parallelizing join operation brings significant benefits.
## 2.1 Divide and Broadcast-based Parallel Join 
Composed of TWO stages:
* Data partitioning by using **_Divide and broadcast method_**
* **_Local join_**

> About **Divide and broadcast**\*:  
> \> **Divide**: Dividing one table by using equal division ( _**Round-robin**_ )  
> \> **Broadcasting**: actual is **replicating** the content of the second table to all processors.  
> \* _the **smaller** to be broadcast, the **larger** to be divided_.  
> \* **REPLICATION ONLY** in shared-nothing architecture, **NO REPLICATION** in shared-memory architecture.


### **The divide and broadcast-based** parallel join algorithm
> Based on **_nested-loop join_**  
>     ==> also called **Parallel nested-loop join algorithm**

## For shared-memory architecture:
### The hash partitioning-based parallel join algorithm
> Based on **_hash join_**  
>     ==> also called **Parallel hash join algorithm**  
> \* hash partition + local join(Hash join is fast)

### The range partitioning-based parallel join algorithm
> Based on **_sort-merge join_**  
>     ==> also called **Sort-merge join algorithm**  
> \* sorting + division + local join(Hash join is fast)

In [10]:
# Round-robin partitioning (equal division)
def rr_partition(data, n):
    result = []
    for i in range(n):
        result.append([])
    n_bin = len(data)/n
    for index, element in enumerate(data):
        index_bin = (int)(index % n)
        #print(":".join([str(index_bin),str(element)]))#For checking
        result[index_bin].append(element)
    return result

rr_partition(R, 3)

[[('Adele', 8), ('Dave', 23), ('Goel', 3), ('Joanna', 2), ('Meng', 1)],
 [('Bob', 22), ('Ed', 11), ('Harry', 17), ('Kelly', 6), ('Noor', 5)],
 [('Clement', 16), ('Fung', 25), ('Irene', 14), ('Lim', 20), ('Omar', 19)]]

In [11]:
import multiprocessing as mp
# import Python multiprocessing module
# 
def DPP_join(T1, T2, n_processor):
    result = []
    T1_subsets = rr_partition(T1, n_processor)
    # T1 is equally divided by using Round-Robin partitioning
    pool = mp.Pool(processes = n_processor)
    # Pool instance
    for t1 in T1_subsets:
        # inner loop for each processor
        result.append(pool.apply(HB_join,[t1,T2]))
    return result

In [12]:
DPP_join(R,S,3)
# Each list is a result yield by one processor 

[[{'Adele, 8, Arts'}, {'Joanna, 2, CompSc'}], [{'Ed, 11, Health'}], []]

## 2.2 Disjoint Partitioning-based Parallel Join
Composed of TWO stages:
* Data partitioning by using **_Disjoint partitioning_**
* **_Local join_**
> ABOUT Disjoint partitioning:  
> such as **range partitioning** or **hash partitioning**

In [13]:
def range_partition_v2(data, range_indices):
    result = []
    sorted_data = sorted(data, key=lambda each:each[1])
    n_bin = len(range_indices)
    for i in range(n_bin):
        s = [x for x in sorted_data if x[1] < range_indices[i]]
        result.append(s)
        sorted_data = sorted_data[len(s):]
    result.append([x for x in sorted_data if x[1] >= range_indices[-1]])
    return result


In [14]:
range_partition_v2(R,[10,20])

[[('Meng', 1),
  ('Joanna', 2),
  ('Goel', 3),
  ('Noor', 5),
  ('Kelly', 6),
  ('Adele', 8)],
 [('Ed', 11), ('Irene', 14), ('Clement', 16), ('Harry', 17), ('Omar', 19)],
 [('Lim', 20), ('Bob', 22), ('Dave', 23), ('Fung', 25)]]

In [15]:
def DPBP_join(T1, T2, n_processor,local_join = HB_join):
    result = []
    # Pre-sorted
    sorted_T1 = sorted(T1, key=lambda each:each[1])
    sorted_T2 = sorted(T2, key=lambda each:each[1])
    '''
    each_range = sorted_T1[-1][1]//n_processor
    range_indices = [each_range, 2*each_range]
    print(range_indices) # For check
    '''
    range_indices = [10,20]
    # Both T1 and T2 are partitioned
    new_T1 = range_partition_v2(T1, range_indices)
    new_T2 = range_partition_v2(T2, range_indices)
    pool = mp.Pool(processes = n_processor)
    for t1,t2 in zip(new_T1, new_T2):
        result.append(pool.apply(local_join,[t1,t2]))
    return result

In [16]:
DPBP_join(R,S,3,NL_join)

[[{'Joanna, 2, CompSc'}, {'Adele, 8, Arts'}], [{'Ed, 11, Health'}], []]

In [17]:
DPBP_join(R,S,3,SM_join)

[[['Joanna', 2, 'CompSc'], ['Adele', 8, 'Arts']], [['Ed', 11, 'Health']], []]

In [18]:
DPBP_join(R,S,3)

[[{'Joanna, 2, CompSc'}, {'Adele, 8, Arts'}], [{'Ed, 11, Health'}], []]