# FIT 5148 - Distributed Databases and Big Data
## Assignment 1
_Due: Week 7 Monday 5PM_


**Your details**
- Name: Siyang Feng 

- Student ID: 28246993

- Name: Xin Wen

- Student ID: 28412702

# Dataset Analysis:
#### Data set characteristics: 
1. climate data is sorted by date from oldest to latest, because the data is collected by sensor, so it must be time sequential.
2. Fire data is sorted by date from latest to oldest, because data is also collected by sensor.

**We assume both tables are sorted by date**

#### Table size:
1. Climate - smaller -> when perform hash partition, we always use climate table to generate hash table
2. Fire - larger


# Import libraries for all tasks

In [22]:
import csv
import sys
import datetime
import numpy as np
import itertools
import multiprocessing as mp
from copy import deepcopy as dc

# Import Datasets

In [23]:
with open('ClimateData.csv','r') as cliData:
    cli = csv.reader(cliData, delimiter=',')
    climate_data = list(cli)

climate_data[:] = climate_data[1:]

In [24]:
with open('FireData.csv', 'r') as fireData:
    fire_list = csv.reader(fireData)
    fire_data = list(fire_list)

fire_data[:] = fire_data[1:]

# Check processor number

In [25]:
# Check processor number in system
# used to determine the partition number
processor_num = mp.cpu_count()
processor_num # check processor number

4

**Partition number should be `processor_num - 1` at most. **

The most efficient condition is when the number of parallel processors is 3. When it larger than 3, the increasing rate of efficiency decrease. 

However, in this task, we still consider the number of parallel processors is flexible. Because we think a good algorithm not only focus on its complexity but also suitable for more condition when its complexity doesn't change.

In [26]:
# define Partition number(processor number - 1)
# because one of these processors is master to hold main function (parallel)
parallel_num = processor_num - 1
parallel_num

3

# Global function for type converting

In [27]:
date_format = '%Y-%m-%d'

In [28]:
# function for converting string into date
def convert_string_to_date(dataset, dateIndex):
    for i in dataset:
        dateTime = datetime.datetime.strptime(i[dateIndex], date_format)
        i[dateIndex] = dateTime.date()
    return dataset

In [29]:
# function for converting date into string
def convert_date_to_string(dataset, dateIndex):
    for i in dataset:
        i[dateIndex] = i[dateIndex].strftime('%Y-%m-%d')
    return dataset

In [30]:
# function for converting string into integer
def convert_string_to_int(dataset, index):
    for i in dataset:
        i[index] = int(i[index])
    return dataset

# Global function for determining range partiton indices

In [31]:
def range_part_indices(dataset, n_proce, part_index):
    """
    define the indices for range partition
    
    Arguments:
    dataset -- input dataset
    n_proce -- the number of processors used for parallel
    part_index -- the position of partiton attribute in each sublist
    
    Return:
    result -- a list of partition indices    
    """
    #part_gap is the gap between each partition
    part_gap = (dataset[-1][part_index] - dataset[0][part_index]) // n_proce 
    result = []
    for i in range(1, n_proce):
        result.append(dataset[0][part_index] + part_gap*i) # calculate the indice
    return result

In [32]:
def range_partition(data, part_index, n_proce):
    """
    Perform range data partition on data based on the partition attribute
    
    Arguments:
    data -- an input dataset
    range_list -- the list of the ranges to be split
    part_index -- the position of partition attribute in each sublist
    
    Return:
    result -- the partitioned subsets
    """
    
    result = []
    # the data has been sorted
    new_data = list(data)
    range_list = range_part_indices(data, n_proce, part_index)
    for i in range_list:
        temp = [] # store the list of each partition
        for j in range(len(new_data)):
            if new_data[j][part_index] >= i: # this partition is created, iterate the next range indice
                new_data = new_data[j:] 
                break
            temp.append(new_data[j])
        result.append(temp)
    result.append(new_data)
        
    return result  

# Global function for hash

In [33]:
def hash_function(x, n):
    """
    hash function to generate hash value
    
    Arguments:
    x -- the value need to hash
    n -- hash base value
    
    Result:
    result -- hash value
    """
    result = x%n
    return result

In [34]:
def hash_partition(data, n, index):
    """
    hash function to partition dataset
    
    Arguements:
    data -- a input dataset
    n -- the number of partition
    index -- the position of partition attribute in the dataset
    
    Return:
    dic -- a dictionary of partitioned dataset
    """
    dic = {} 
    for x in data: 
        h_data = x[index] # h_data is the attribute we are going to hash
        h = hash_function(h_data,n) # h is the hash value of h_data
        if (h in dic.keys()): # create hash table, key is h(hash value), value is the record of this value value.
            dic[h].append(x) # if the key already exist, we append this record in the same entry.
        else: 
            dic[h] = [x]  # if the key doesn't exist, we create one with the record.          
    return dic

In [35]:
def hash_partition_date(data, n, dateIndex):
    """
    a hash function for partitioning based on date
    
    Arguments:
    data -- input dataset
    n -- number of processors used for parallel
    index -- the index of date in each item
    
    Return:
    dic -- a dictionary of partition resault
    """
    
    dic = {} 
    for x in data: 
        h_data = x[dateIndex]
        h_day = int(h_data[-2:]) # here we use the number of day in the date to do hash, and we need to convert it into integer first
        h = hash_function(h_day,n)  # hash value of the date.
        if (h in dic.keys()): 
            dic[h].append(x)
        else: 
            dic[h] = [x]            
    return dic

# Task 1: Parallel Search
#### Options for data partitioning methods:
1. Round-robin
    - Pro: load balancing
    - Con: records are not grouped semantically, not efficient for searching, for each search, all processors need to be activated.
    
2. Hash
    - Pro: based on attribute to do partition, if searching attribute is the same with hash attribute, then the performance is very good. (only need to activate one processor)
    - Con: 
        * load imbalancing, skew
        * If searching attribute is different from hash attribute, then not efficient.
        * Not suitable for range searching, if user wants to do continuous range search, then all the processors need to be activated.
    
3. Range
    - Pro: good for range searching (only need to activate one or selected processors)
    - Con:
        * Skew
        * If searching attribute is different from range partition attribute, then not efficient.
    
4. Random-unequal
    - Used for pipelining jobs and have skew problem

#### Searching technique Options:
1. Linear search:
    * Suitable for unordered data
2. Binary search
    * Suitable for ordered data, more efficient.


### Task 1.1
Write an algorithm to search climate data for the records on _**15th December 2017**_ . Justify your choice of the data partition technique and search technique you have used.

#### Decision in task 1.1:
we are doing exact match search, as climate data has one record for each day, so it is unique, if one match is found, the searching is completed.

We assume the data is ordered by date(as it is from sensor), so range partition and hash partition are both possible, it is difficult to compare the cost of these two techniques, but hash partition will have to generate a hash table, and if the hash table size > main memory size, overflow problem can cause extra overhead.

The disadvantage of the range partition technique is that skew will possibly occur, but we don't think round-robin is a good idea. It can achieve load balancing, but to do each search, all partitions need to be activated.

So we decide to use range partition for task1.1, and use binary search.

In [36]:
# assign climate data into another variable
climate = dc(climate_data)

#### binary search

In [37]:
# use binary_search for this task
# because the data is already sorted by date
# binary search is more efficent in this condition
def binary_search_T1_1(data, key, key_index):
    """
    binary search function to find key in dataset
    
    Arguments:
    data -- input dataset
    key -- target to find
    key_index -- the index of key in each item of the dataset
    
    Return:
    matched_record -- the matched item
    """
    matched_record = None
    lower = 0 
    middle = 0 
    upper = len(data)-1 
    while(lower <= upper):
        middle = int((lower + upper)/2)
        x = data[middle]
        if x[key_index] == key:
            matched_record = x
            break
        elif x[key_index] > key:
            upper = middle - 1
        else:
            lower = middle + 1
    return matched_record

#### parallel processing

In [38]:
# multiprocessing, use range_partition and binary_search to do this task
# as there is only one record for each day in the climate data, 
# so if a match is found, the search is completed
def parallel_search_exact_T1_1(data, query, date_index, n_processor):
    """
    parallel processing to search a specific value
    
    Arguments:
    data -- input dataset
    query -- target to find
    range_indices -- the list of range partition indices
    date_index -- the index of key in each item of the dataset
    n_processor -- the number of processor for parallel
    
    Return:
    results -- find out item
    """
    # convert string to date
    data = convert_string_to_date(data, 1)
    
    results = []
    pool = mp.Pool(processes=n_processor)
    DD = range_partition(data, date_index, n_processor)
    for d in DD:
        i = len(d)
        last_i = i-1
        if query > d[0][date_index] and query < d[i-1][date_index]:
            result = pool.apply(binary_search_T1_1, [d,query,date_index])
            results.append(result)
            break
    pool.close()
    return results

In [39]:
# for testing
q = datetime.datetime.strptime('2017-12-15',date_format).date() # the key for query
results = parallel_search_exact_T1_1(climate, q, 1, parallel_num)
results

[['948702',
  datetime.date(2017, 12, 15),
  '18',
  '52',
  '7.1',
  '14',
  '   74.5*',
  '53.1',
  ' 0.00I']]

### Task 1.2
Write an algorithm to find the latitude, longitude and confidence when the surface temperature (°C) was between 65 °C and 100 °C . Justify your choice of the data partition technique and search technique you have used.

##### Decision in task 1.2:
We are doing continuous range search, but the data is **unordered base on the searching attribute(surface temperature)**, so we decide to use hash partition for this task, as it doesn't need to sort the data first and quite efficient compared with random-equal partition.

For the searching technique, we decide to use linear search, as it is an unordered dataset (based on the partition attribute).


In [40]:
fire = dc(fire_data)

#### linear search

In [41]:
def linear_search_T1_2(data, key, index):
    """
    search target in linear
    
    Arguements:
    data -- input dataset
    key -- query item
    index -- the position of the query key in each item of dataset(in this case, it is the index of surface temperature)
    
    Return:
    result -- a list of matching items
    """
    result = []
    # loop data, this is not exact match, so we need to search all the data to find all matches.
    for x in data:
        if x[index] == key: ##if found, we append the record in the result
            result.append(x)
    return result

#### multiprocessing function, use hash_partition and linear_search

In [42]:
def parallel_search_range_T1_2(data, query_range, n_processor, index):
    """
    parallel rearch to search a range of key
    
    Arguements:
    data -- input dataset
    query_range -- a range of keys for searching
    n_processor -- a number of processor for parallel
    index -- the position of the query key in each item of dataset(in this case, it is the index of surface temperature)
    
    Return:
    results -- a list of matching items
    """
    results = []
    pool = mp.Pool(processes=n_processor)
    data = convert_string_to_int(data, -1) #convert the surface temperature into integer
    
    DD=hash_partition(data, n_processor, index) #do hash partition on the dataset, DD is the partitioned dataset.
    
    for x in range(query_range[0],query_range[1]+1): # the query range is continuous, so we need to search for each value in between.
        query_hash = hash_function(x,n_processor) # hash the value in search range first, query_hash is the hashed value.
        d = list(DD[query_hash]) #d is the records in the hash partition of query_hash.
        result = pool.apply(linear_search_T1_2, [d, x, index])# do parallel linear_search for all values in the search range
        if len(result) > 0: #only append result which has found match.
            results.append(result)
    pool.close()
    return results

In [None]:
# For testing
q = [65,100]  # the searching query
result = parallel_search_range_T1_2(fire, q, parallel_num, -1)
result

# Task 2 Parallel Join
#### Join algorithm options:
1. Nested loop
    - Not efficient, records are scanned more than once
2. Sort-merge join
    - Each record needs to be scanned only once
    - Data needs to be sorted first.
3. Hash-based join
    - Each record needs to be scanned only once
    - Data needs to be hashed first(create a hash table)

#### Parallel algorithm for join:
1. Divide and broadcast
    - Divide the larger table using round-robin, then broadcast the smaller table to all processors
    - Local join can use any join algorithm
    - Pro: 
        * load balancing
    - Con: 
        * skew for result production, smaller table is duplicated
2. Disjoint data partitioning
    - Data partitioning need to use disjoint partition(range or hash)
    - Local join can use any serial algorithm
    - Pro:
        * No duplication of smaller table
        * If use range partition, table needs to be sorted first
        * If use hash partition, needs to create a hash table first
    - Con:
        * Skew

#### Decision:
Task 2.1 and 2.2 will join two tables based on the Date attribute, although tables are sorted by date, it is very difficult to tell the date range of these two tables before looking through the data (because two tables could have different date range, which makes it complex to determine range indices).

For these 2 subtasks, we prefer to use hash partition and hash-based join.

* For the data partition, we use hash partition based on join attribute(date) and 
* For the local join, we use hash-based join.


### Task 2.1
Write an algorithm to find 1)surface temperature (°C), 2)air temperature (°C), 3)relative humidity and 4)maximum wind speed. Justify your choice of the data partition technique and join technique you have used.

In [44]:
# assign datasets of fire and climate data
climate = dc(climate_data)
fire = dc(fire_data)

#### hash based join function

In [45]:
def HB_join_T2_1(T1, T2):

    """ 
    Perform the hash-based join algorithm.

    The join attribute is the date attribute in the input tables T1 & T 2

    Arguments:

    T1 & T2 -- Tables to be joined
    T1 - fire - larger table (used for probing)
    

    Return:

    result -- the joined result
    """

    result = []

    dic = {} # hash table

    # For each record in table T2 climate, create local hash table
    for s in T2:
        
        #use the day value of the date(index 1) to do hash function
        s_date = s[1] 
        s_day = int(s_date[-2:])
        
        # Hash the record based on join attribute value using hash function
        # the hash table has to be different from global hash table, we choose 6 to generate local hash table
        s_key = hash_function(s_day, 6)  
        if s_key in dic:
            dic[s_key].append(s) # If there is an entry 
        else:
            dic[s_key] = [s]

    # For each record in table T1 fire (probing) 
    for r in T1:
    # Hash the record based on join attribute value using hash function
        r_date = r[-2] # -2 is the index of date in table T1
        r_day = int(r_date[-2:])
        r_key = hash_function(r_day, 6) 

    # If an index entry is found Then 
        # Compare each record on this index entry with the record of table T2
        # If the key is the same then put the result 
        if r_key in dic:
            for i in dic[r_key]:
                if r[-2] == i[1]: # r is record in T1 (fire), i is entry in hash table (based on T2, climate)
                    result.append({", ".join([r[-1], i[2], i[3], i[5]])}) 
                    #r[-1] surface temperature, i[2] air temperature, i[3] relative humidity, i[5] maximum wind speed
    return result

#### multiprocessing using disjoint partition-based parallel join，this function is used for task2-1 and task2-2

In [46]:
def DPBP_join_T2(T1, T2, n_processor, T1_partIndex, T2_partIndex, HB_join):
    """
    disjoint partition-based parallel join function
    
    Arguments:
    T1 -- input dataset (fire) - larger table
    T2 -- input dataset (climate) - smaller table
    n_processor -- number of processor for parallel
    T1_partIndex -- date index in each item of fire dataset
    T2_partIndex -- date index in each item of climate dataset
    HB_join -- hash-based join function to use
    
    Return:
    result -- the joined list
    """
    
    result = [] 
    
    # perform hash partition for both tables
    T1_subsets = hash_partition_date(T1, n_processor, T1_partIndex)

    T2_subsets = hash_partition_date(T2, n_processor, T2_partIndex)
    
    pool = mp.Pool(processes = n_processor)
    
    # Apply local join for each processor
    for index in range(n_processor):
        result.append(pool.apply(HB_join, [T1_subsets[index], T2_subsets[index]]))
    pool.close()
        
    return result

In [None]:
# For testing
result = DPBP_join_T2(fire, climate, processor_num, -2, 1, HB_join_T2_1)
#surface temperature, air temperature, relative humidity, maximum wind speed
result

### Task 2.2
Write an algorithm to find 1)datetime, 2)air temperature (°C), 3)surface temperature (°C) and 4)confidence when the confidence is between 80 and 100. Justify your choice of the data partition technique and join technique you have used.

In [48]:
# assign datasets of fire and climate data
climate = dc(climate_data)
fire = dc(fire_data)

#### hash based join function

In [49]:
def HB_join_T2_2(T1, T2):

    """ 
    Perform the hash-based join algorithm.

    The join attribute is the date attribute in the input tables T1 & T 2

    Arguments:

    T1 & T2 -- Tables to be joined
    T1 - fire - larger table (used for probing)
    

    Return:

    result -- the joined result
    """

    result = []

    dic = {} # hash table

    # For each record in table T2 climate, create local hash table
    for s in T2:
        
        #use the day value of the date(index 1) to do hash function
        s_date = s[1] 
        s_day = int(s_date[-2:])
        
        # Hash the record based on join attribute value using hash function
        # the hash table has to be different from global hash table, we choose 6 to generate local hash table
        s_key = hash_function(s_day, 6)  
        if s_key in dic:
            dic[s_key].append(s) # If there is an entry 
        else:
            dic[s_key] = [s]

    # For each record in table T1 fire (probing) 
    for r in T1:
    # Hash the record based on join attribute value using hash function
        r[5] = int(r[5]) # convert confidence into integer for further comparison
        r_date = r[-2] # -2 is the index of date in table T1
        r_day = int(r_date[-2:])
        r_key = hash_function(r_day, 6) 

    # If an index entry is found Then 
        # Compare each record on this index entry with the record of table T2
        # If the key is the same then put the result 
        if r_key in dic:
            for i in dic[r_key]:
                # r is record in T1 (fire), i is entry in hash table (based on T2, climate)
                # only find record with confident between 80 and 100
                if r[-2] == i[1] and r[5] >= 80 and r[5] <= 100:                    
                    result.append({", ".join([r[3], i[2], r[-1], str(r[5])])}) 
                    #r[3] datetime, i[2] air temperature, r[-1] surface temperature, r[5] confidence
    return result

In [None]:
# For testing
result = DPBP_join_T2(fire, climate, parallel_num, -2, 1, HB_join_T2_2)
#datetime, air temperature, surface temperature, confidence
result

# Task 3: Parallel Sort
#### Options:
1. Parallel merge-all sort
    - Pro: load balancing
    - Con: 
        * merge heavy in one processor(host)
        * Network contention
        * If the number of files open at the same time is limited, this method cannot be used.
    - Decision: 
        * not use this technique, because merge phase will be the bottleneck

2. Parallel binary-merge sort
    - Pro: 
        * workload of merging is spread through pipeline of processors
        * Sort phase can achieve load balancing by using round-robin
    - Con:
        * For the host, still heavy work in the final merge phase
        * Extra work and higher tree is caused by using pipeline
    - Decision: 
        * not use this method, because for the host, still heavy work, not efficient.

3. Parallel redistribution merge-all sort
    - Pro:
        * One level tree
        * Merging stage use parallelism
        * Merge work is lighter compared with no redistribution.

    - Con:
        * In the merge stage, skew may occur
        * File opening at the same time can be the bottleneck at final stage

    - Decision: 
        * this method is good, parallelism can be achieved and the tree is only one level
	
4. Parallel redistribution binary-merge sort
    - Pro:
        * Merging stage use parallelism, and parallelism is achieved at all levels of merging
        * Merge work is lighter compared with no redistribution.
    - Con:
        * The tree is high because of the pipeline
        * Skew may occur in the merging stage
        * Final merging stage will also have the file opening bottleneck

    - Decision: 
        * not use this method, because the tree is high.
	
5. Parallel Partitioned sort
    - Pro:
        * No merging actually happens, so there’s no bottleneck
        * Parallelism is achieved
        * One level tree
    - Con:
        * Skew of work
    - Decision: 
        * this method is good, the only problem is that we need to decide the range indices, so we need to scan the data first, as the data is not ordered by the sort attribute, which can be slow.

#### Summary:
After comparing these techniques and based on the dataset, we decide to use round-robin for partition(load balancing) and use parallel redistribution merge-all sort (parallelism of sorting and merging, less I/O, one-level tree).

And we choose quick sort for the local sort technique, because it is more efficient than bubble sort and insertion sort.


#### Task:
Write an algorithm to sort ​ fire data ​ based on ​ surface temperature ​ ​ (°C) ​ in a ​ ascending order. ​ Justify your choice of the data partition technique and sorting technique you have used


__Parallel redistribution mrege-all sort:__

* _First distribute:_ Round-Robin
* _Range Distribute:_ compare the first item of each partition and last item of each partition get the min and max value and define the partition range. Then, distribute in each process.
* _Merge all sort:_ In each local partition, use merge all to merge sort each partition (k-way-merge)
* _Merge as range_

In [51]:
# assign datasets of fire data
fire = dc(fire_data)

In [52]:
# Round-robin data partitioning function
def rr_partition_T3(data, n):
    """
    Perform data partitioning on data
    
    Arguments:
    data -- an input dataset which is a list
    n -- the number of processors
    
    Return:
    result -- the partitioned subsets of data
    """
    
    result = []
    for i in range(n):
        result.append([])
    
    # partition the data into each bin
    for index, item in enumerate(data):
        # use the index to calculate the partitioning bin
        result[(int)(index % n)].append(item)
    
    return result

In [53]:
def qsort_T3(arr): 

    """ 
    Quicksort a list
    
    Arguments:
    arr -- the input list to be sorted

    Return:
    result -- the sorted list
    """
    if len(arr) <= 1:
        return arr
    else:
        return qsort_T3([x for x in arr[1:] if int(x[-1]) < int(arr[0][-1])]) \
                + [arr[0]] \
                + qsort_T3([x for x in arr[1:] if int(x[-1]) >= int(arr[0][-1])])
            

In [54]:
def distribute_range_T3(list_sum, n_proc):
    """
    Create the range indices list for next step to do range partition.
    Find out the max value and min value in each subset and compare them to find the max and min for the whole dataset. 
    The partition point can be divided by their range between max and min and 
    the number of processors.
    
    Arguments:
    list_sum -- a list of sorted subset
    n_proc -- the number of proccess using in parallel proccessing
    
    Return:
    range_list -- the range indices for range_partition
    """
    
    # find the max and min temperature in the first subset
    min_v = int(list_sum[0][0][-1]) #-1 is the index of surface temperature
    max_v = int(list_sum[0][-1][-1])
    
    # compare the max and min in other subset with the first subset's min and max, find the max and min for the whole dataset
    for item in list_sum[1:]:
        if int(item[0][-1]) < min_v:
            min_v = int(item[0][-1])
        if int(item[-1][-1]) > max_v:
            max_v = int(item[-1][-1])
    
    # determine the range indice for the next step
    range_v = int((max_v - min_v) / n_proc)
    range_list = list()
    for i in range(1, n_proc):
        range_list.append(min_v + i*range_v)
    return range_list
    

In [55]:
def range_partition_T3(dataset, range_list):
    """
    partition the sorted dataset
    
    Arguements:
    dataset -- sorted data list
    range_list -- list of the range indices
    
    Return:
    result -- list of partitioned dataset
    """
    
    result = []
    
    # the data has been sorted
    for i in range_list:
        temp = []
        for j in range(len(dataset)):
            if int(dataset[j][-1]) > i:
                dataset = dataset[j:]
                break
            temp.append(dataset[j])
        result.append(temp)
    result.append(dataset)
    
    
    return result

In [56]:
# find the smallest record
def find_min_T3(records):
    """
    Find the smallest record
    
    Arguements:
    records -- the input record set
    
    Return :
    index -- the smallest record's index
    """
    
    m = int(records[0][-1])
    index = 0
    for i in range(1, len(records)):
        if(int(records[i][-1]) < m):
            index = i
            m = int(records[i][-1])
    return index

In [57]:
def k_way_merge_T3(record_sets, buffer_size):
    """
    K-way merging algorithm
    
    Arguments:
    record_sets -- the set of multiple sorted sub-record sets
    
    Return:
    result -- the sorted and merged record set
    
    """
    
    ##check buffer_size first
    if (buffer_size <= 2):

        print("Error: buffer size should be greater than 2") 
        return #end of method.
    
    # indexes will keep the indexes of sorted records in the given buffers
    indexes = []
    for x in record_sets:
        indexes.append(0) # initialisation with 0
        
    # final result will be sorted in this variable
    result = []
    
    # the merging unit (i.e. # of the given buffers)
    tuple = []
    
    while True:
        tuple = [] # initialise tuple
        
        # Loop gets the current position of every buffer
        for i in range(len(record_sets)):
            if (indexes[i] >= len(record_sets[i])):
                tuple.append([sys.maxsize])
            else:
                tuple.append(record_sets[i][indexes[i]])
        
        # find the smallest record
        smallest = find_min_T3(tuple)
        
        # if there is only sys.maxsize on the tuple, we reached the end of every record set
        if int(tuple[smallest][-1]) == sys.maxsize:
            break
            
        # this record is the next on the merged list
        result.append(record_sets[smallest][indexes[smallest]])
        indexes[smallest] += 1
        
    return result
    

In [58]:
def range_merge_T3(sorted_lists):
    """
    merge the sorted lists together by sorting
    
    Arguments:
    sorted_lists -- a list with sorted sublists
    
    Return:
    result -- a list merged sublists in ascending order
    """
    
    result = []
    for i in range(len(sorted_lists)-1):
        for j in range(i+1, len(sorted_lists)):
            if int(sorted_lists[j][0][-1]) < int(sorted_lists[i][0][-1]):
                sorted_lists[i], sorted_lists[j] = sorted_lists[j], sorted_lists[i]
    
    result = list(itertools.chain(*sorted_lists)) #create the result list
    
    return result

In [82]:
def parallel_merge_sort_T3(dataset, n_processor, buffer_size):
    """
    Perform a perellal k-merge sort method
    
    Arguments:
    dataset -- entire record set to be sorted
    n_processor -- the number of processors
    
    Return:
    result -- the sorted result
    """
    
    ##check buffer_size first
    if (buffer_size <= 2):

        print("Error: buffer size should be greater than 2") 
        return #end of method.
    
    result = []
    
    # partition dataset through round-robin
    rr_list = rr_partition_T3(dataset, n_processor)
    
    
    # define the parallel processor number
    pool = mp.Pool(processes=n_processor)
    
    
    # parallel sort items in partition dataset, rr_list, by quick sort
    qsort_list = []
    for i in rr_list:
        qsort_list.append(pool.apply(qsort_T3, [i]))
        
    pool.close()
    
    
    # get the list of partition points for further range partition
    range_list = distribute_range_T3(qsort_list, n_processor)
    
    # create a partition list with sublists which is defined by processor number
    partition_list = []
    for i in range(n_processor):
        partition_list.append([])
    
    # because each partition will contain sublists which is divided by partition points defined before
    # for next parallel processing, lists in same range should merge into a same list
    pool = mp.Pool(processes=n_processor)
    for i in qsort_list:
        record = pool.apply(range_partition_T3, [i, range_list])
        for j, item in enumerate(record):
            partition_list[j].append(item)
    pool.close()
    
    # do local k-way-merge
    pool = mp.Pool(processes=n_processor)
    merged_list = []
    for i in partition_list:
        merged_list.append(pool.apply(k_way_merge_T3, [i, buffer_size]))  # k-way merge sort to merge each sublist together
        
    pool.close()
    
    result = range_merge_T3(merged_list)  # merge sublist by sorting their first item
    
    return result
    

In [None]:
# For Testing
parallel_merge_sort_T3(fire, parallel_num, 3)

# Task 4 Paralle Groupby
##### Technique comparison:
1. Merge-all groupby
    - Pro:
        * Suitable when the number of nodes are small and the number of resulting records are small
    - Con:
        * If the group size is bigger, host will become the bottleneck
2. Hierarchical merging groupby
    - Pro:
        * Suitable when the number of resulting records are small
    - Con:
        * If the group size is bigger, host will still be the bottleneck

3. Two-phase method:
    - Pro:
        * Distribute local results based on the group-by attribute, this solved the bottleneck problem

4. Redistribution method
    - Pro:
        * No need to do local aggregation before distribution, more efficient
        * No bottleneck problem

##### Examine data characteristic
1. Fire data is sorted by date, so it is easy for us to do range partition (easy to determine range indices)

##### Decision:
- Based on the dataset characteristic and pros/cons of each technique, our group decide to use **redistribution method** for this task (range partition), as it is more efficient, and avoid the bottleneck problem


### Task 4.1
Write an algorithm to get the number of fire in each day. 

You are required to only display total number of fire and the date in the output. 

Justify your choice of the data partition technique if any.

In [68]:
# assign datasets of fire and climate data
fire = dc(fire_data)

#### local groupby function

In [69]:
def local_groupby_count(dataset, gb_index):
    """ 
    a local groupby function to calculate the number of records

    Arguments:

    dataset -- entire record set to be merged
    gb_index -- the index of groupby attribute in each item

    Return:

    result -- the aggregated record set according to the group_by attribute index 
    """
    
    # convert date to string first because we need to use string to create dictionary key
    dataset = convert_date_to_string(dataset, gb_index) 

    dict = {} 
    for index, record in enumerate(dataset):
        key = record[gb_index] 
        if key not in dict:
            dict[key] = 1  
        else:
            dict[key] += 1
    return dict

#### multiprocessing function

In [70]:
def parallel_redistribution_groupby_T4_1(dataset, date_index, n_processor):

    """ Perform a parallel merge_all groupby method

    Arguments:

    dataset -- entire record set to be merged
    date_index -- the index of groupby attribute
    n_processor -- the number of partitions

    Return:

    result -- the aggregated record dictionary according to the group_by attribute index 
    """

    result = []
    
    # convert string into date first to perform range_partition
    
    dataset = convert_string_to_date(dataset, date_index) 
    
    # perform range_partition based on date
    
    DD = range_partition(dataset, date_index, n_processor)

    pool = mp.Pool(processes = n_processor)

    # ----- Local aggregation step ----
    for d in DD:

        # call the local aggregation method

        result.append(pool.apply(local_groupby_count, [d, date_index])) 
        
    pool.close()

    return result

In [None]:
# fire data is ordered from latest date to oldest, so we need to reverse it first.
result = parallel_redistribution_groupby_T4_1(list(reversed(fire)), -2, parallel_num)
# date, total number of fire in that day
result

### Task 4.2
Write an algorithm to find the average surface temperature(°C) for each day

You are required to only display average surface temperature (°C) and the date in the output.

Justify your choice of the data partition technique if any.

In [72]:
fire = dc(fire_data)

#### local groupby function

In [73]:
def local_groupby_average(dataset, gb_index):

    """ 
    a local groupby function for calculating average

    Arguments:

    dataset -- entire record set to be merged
    gb_index -- the index of groupby attribute in each item

    Return:

    result -- the aggregated record set according to the group_by attribute index 
    """

    # convert date to string first because we need to use string to create dictionary key
    dataset = convert_date_to_string(dataset, gb_index) 

    dict = {} 
    for index, record in enumerate(dataset):
        key = record[gb_index] 
        if key not in dict:
            # 1 is the number of record, record[-1] is the index of column we need to calculate average, we save these two value to get average
            dict[key] = [1, record[-1]]
        else:
            dict[key][0] += 1 # add the record number by 1
            dict[key][1] += record[-1] # add the value to sum
    for i in dict:
        avg = round(dict[i][1]/dict[i][0],2) # calculate the average and round it.
        dict[i] = avg #save the average value into dictionary
    
    return dict

#### multiprocessing function

In [74]:
def parallel_redistribution_groupby_T4_2(dataset, date_index, n_processor):

    """ Perform a parallel merge_all groupby method

    Arguments:

    dataset -- entire record set to be merged

    Return:

    result -- the aggregated record dictionary according to the group_by attribute index 
    """

    result = []
    
    #convert string into date (column date, index -2)
    dataset = convert_string_to_date(dataset,-2)
    #convert string into int (column surface temperature, index -1)
    dataset = convert_string_to_int(dataset, -1)
    
    DD = range_partition(dataset, date_index, n_processor)

    pool = mp.Pool(processes = n_processor)

    # ----- Local aggregation step ----
    for d in DD:

        # call the local aggregation method

        result.append(pool.apply(local_groupby_average, [d, date_index])) 
        
    pool.close()

    return result

In [None]:
# fire data is ordered from latest date to oldest, so we need to reverse it first.
result = parallel_redistribution_groupby_T4_2(list(reversed(fire)), -2, parallel_num)
# date, average surface temperature in that day
result

# Task 5 Group-by join

Write an algorithm to find the average surface temperature (°C) for each weather station.

You are required to only display average surface temperature (°C) and the station in the output.

Justify your choice of the data partition and join technique.

Hint: You need to join using the date and group by based on station.

__Groupby attribute__ is weather station

__Join attribute__ is date
- > we choose to use groupby after join, because groupby attribute is different from join attribute

__Textbook reference:__ 
For efﬁciency, when the join selectivity factor is small and the degree of skewness is low, the join partitioning scheme leads to less cost; otherwise, the GroupBy partitioning scheme is desirable. In addition, it can be observed that the partitioning with the group-by attribute scheme is insensitive to the group-by factor and thus the scheme will simplify algorithm design and implementation.

#### Determine the partition attribute：
##### Choice1: use join attribute to do partition
- Pro: 
    * no need to broadcast the fire table
    * Less cost when the join selectivity factor is small, and degree of skewness is low
- Con:
    * Algorithm is more complex
    
##### Choice2: use groupby attribute to do partition
- Pro:
    * Less cost when the join selectivity factor is large, and degree of skewness is high
    * insensitive to the group-by factor and algorithm design and implementation is simpler

##### Summary:
For this task, we choose to use groupby attribute, because the join selectivity ratio is high, and the algorithm is simpler.

For the data partitioning technique, we choose to use hash partition,

For the local join, we choose to use hash-based join.


In [76]:
# assign datasets of fire and climate data
fire = dc(fire_data)
climate = dc(climate_data)

####  hash based join for local join
join each climate partition with the whole fire table, this is shared memory, so we don't need to broadcast it

In [77]:
def HB_join_T5(T1, T2, n_processor):
    
    
    """ 
    Perform the hash-based join algorithm.

    The join attribute is the date attribute in the input tables T1 & T 2

    Arguments:

    T1 & T2 -- Tables to be joined
    T1 - fire - larger table (probing)
    T2 - climate - smaller table (create hash table)
    

    Return:

    result -- the joined result
    """

    result = []

    dic = {}

    # For each record in table T2 （climate)
    for s in T2:
        s_date = s[1] # 1 is the index of date attribute in table T2
        s_day = int(s_date[-2:])
        # Hash the record based on join attribute value using hash function into hash table
        
        s_key = hash_function(s_day, n_processor) 
        if s_key in dic:
            dic[s_key].append(s)
        else:
            dic[s_key] = [s]

    # For each record in table T1 (fire) (probing) 
    # Hash the record based on join attribute value using hash function
    for r in T1:
        r_date = r[-2] # -2 is the index of date attribute in table T1
        r_day = int(r_date[-2:])
        r_key = hash_function(r_day, n_processor)

    # If an index entry is found Then 
        # Compare each record on this index entry with the record of table T1
        # If the key is the same then put the result 
        if r_key in dic:
            for i in dic[r_key]:
                if r[-2] == i[1]:
                    # i[0] station number, r[-1] average surface temperature
                    result.append([str(i[0]), r[-1]])
                    

    return result

#### multiprocessing for parallel join

In [78]:
def DPBP_join_T5(T1, T2, n_processor, T2_partIndex):
    
    """
    disjoint partition-based parallel join function
    
    Arguments:
    T1 -- input dataset (fire) - larger table
    T2 -- input dataset (climate) - smaller table
    n_processor -- number of processor for parallel
    partIndex -- date index in each item of climate dataset (T2
    
    Return:
    result -- the joined list
    """
    
    result = [] 
    
    #do partition on the climate table, the partition attribute is station.
    T2_subsets = hash_partition(T2, n_processor, T2_partIndex)
    
    pool = mp.Pool(processes = n_processor)
    # Apply local join for each processor
    for index in range(n_processor):
        result.append(pool.apply(HB_join_T5, [T1, T2_subsets[index], n_processor]))
    
    pool.close()

    return result

#### local groupby function

In [79]:
def local_groupby_T5(dataset):

    """ 
    
    Perform a local groupby method to get average value

    Arguments:

    dataset -- entire record set to be merged

    Return:

    result -- the aggregated record set according to the group_by attribute index 
    """
    dict = {} 
    for i in dataset:
        key = i[0] 
        if key not in dict:
            dict[key] = [1, int(i[1])]
        else:
            dict[key][0] += 1
            dict[key][1] += int(i[1])
    for i in dict:
        avg = round(dict[i][1]/dict[i][0],2)
        dict[i] = avg
    
    return dict

#### multiprocessing for parallel groupby

In [80]:
def parallel_groupby_T5(T1, T2, n_processor, partIndex):

    """ 
    
    Perform a parallel merge_all groupby method

    Arguments:
    
    T1 --

    dataset -- entire record set to be merged

    Return:

    result -- the aggregated record dictionary according to the group_by attribute index 
    """
    # convert station number into int (climate table)
    T2 = convert_string_to_int(T2, 0)
    
    
    #join two tables first
    dataset = DPBP_join_T5(T1, T2, n_processor, 0)
    

    pool = mp.Pool(processes = n_processor)

    # ----- Local aggregation step ----
    result = [] 
    for s in dataset:
        
        # call the local aggregation method

        result.append(pool.apply(local_groupby_T5, [s])) 
    pool.close()

    return result

In [81]:
# For testing
result = parallel_groupby_T5(fire, climate, parallel_num, 0)
# station number, average surface temperature (°C)
result

[{'948702': 52.15}, {}, {'948701': 56.07}]