# FIT5148 - Distributed Databases and Big Data

# Assignment 1 - Solution Workbook


**Instructions:**
- You will be using Python 3.
- Read the assignment instruction carefully and implement the algorithms in this workbook. 
- You can use the datasets fireData and climateData (provided below) if you are aiming for Credit Task.
- For Distinction and High Distinction tasks, you are required to read the files FireData.csv and ClimateData.CSV provided with the assignment programatically and prepare the data in the correct format so that it can be used in your algorithm. 
- You can introduce new cells as necessary.

**Your details**
- Name: Boyu Zhang
- Student ID:28491300 

- Name: Yining Ye
- Student ID:27917517

Let's get started!

## Pre-processing
### Load dataset

In [0]:
#import multiprocessing as mp
import csv
from datetime import datetime
import multiprocessing as mp

In [0]:
firePath = './data/FireData.csv'
climatePath = './data/ClimateData.csv'

In [0]:
def read_to_list(path):
    with open(path,'r') as f:
        reader = csv.reader(f)
        return list(reader)

In [0]:
climateData = read_to_list(climatePath)[1:]
fireData = read_to_list(firePath)[1:]

In [0]:
#a glance on the data of climate
climateData[0],fireData[0]

(['948700',
  '2016-12-31',
  '19',
  '56.8',
  '7.9',
  '11.1',
  '   72.0*',
  '  61.9*',
  ' 0.00I'],
 ['-37.966',
  '145.051',
  '341.8',
  '2017-12-27T04:16:51',
  '26.7',
  '78',
  '2017-12-27',
  '68'])

## Task 1 Parallel Search
### 1. 
Write an algorithm to search climate data for the records on ​15th December 2017​. 
Justify your choice of the data partition technique and search technique you have used.


In [0]:
# check if climate data is sorted by date
climateData == sorted(climateData,key=lambda x: x[1])

True

**Justification**:

Binary search + roud-robin partition

From the above exploration of the data in the climateData list, we can find out that all data are already sorted in term of the data column which is is excatly our search key.

Given this, we can just pick a simplest partition method such as round-robin to partition the dataset evenly which maintains the balance of load without compromising on efficiency.

The binary search is obviously the desirable option when the source data is already sorted

In [0]:
#here starts the first half of task1
#first pick a partition method:
def rr_partition(data,n):
    """
    Perform a simple round robin partition on the given data set
    
    Parameters:
    data: the dataset to be partitioned, which is a list
    n: the number of groups that the dataset will be divided into
    
    Return:
    result: the partitioned subset of the dataset 
    """
    result = []
    for i in  range(n):
        result.append([])
    for index,element in enumerate(data):
        index_bin = index%n
        result[index_bin].append(element)
    return result
    
#then pick a search method:
def binary_search(data,key):
    """
    Perform binary search given certain key
    
    Parameters:
    data: the input dataset which is a list
    key: an query record
    
    Return:
    found: the mathced record and its position in a tuple, return (-1,None) if not found 
    """
    position = -1
    found = None
    upper = len(data) - 1
    lower = 0
    
    while lower <= upper and not found:
        mid = (upper + lower)//2
        if data[mid][1] == key:
            found = data[mid]
            position = mid
        elif data[mid][1] < key:
            lower = mid + 1
        else:
            upper = mid - 1     
    return found

#the complete parrallel search:
from multiprocessing import Pool
def parallel_search_date(data,query,n_processor):
    """
    A method doing parallel search on a given dataset ,
    when given a search clue like a single key or a range for certain column value
    
    Parameters:
    data: the dataset to be searched, which is a list
    query: a query record
    n_processer: the number of processor to parallize the search job
    
    Return:
    results: the list of all search results in all processors
    """
    results = [read_to_list(climatePath)[0]]
    pool = Pool(processes=n_processor)
    datasets = rr_partition(data, n_processor)
    for partition in datasets:
        result = pool.apply_async(binary_search, args=(partition,query))
        output = result.get()
        results.append(output)
    return results

In [0]:
#test the output
parallel_search_date(climateData,'2017-12-15',6)

[['Station',
  ' Date',
  '   Air Temperature(Celcius)',
  '  Relative Humidity',
  '  WindSpeed  (knots)',
  ' Max Wind Speed',
  '   MAX  ',
  '  MIN  ',
  'Precipitation '],
 ['948702',
  '2017-12-15',
  '18',
  '52',
  '7.1',
  '14',
  '   74.5*',
  '53.1',
  ' 0.00I'],
 None,
 None,
 None,
 None,
 None]

### 2.
Write an algorithm to find the​ ``latitude``​, ​``longitude`` ​and ​``confidence`` ​when the surface
temperature (°C) was between ​65 °C​ and​ 100 °C​. Justify your choice of the data partition
technique and search technique you have used.


**Justification**:

Round-robin partition + linear-search

From the foregoing exploration we can see that the record of fire data is not sorted by surface temperature. THis time the query is a range, it is easy to consider range partition first, however, it appears if the partition range matches the query range, then there is totally no point search in the other partitions which is not a parallized case any more; on the other hand if the two ranges do not match, then there is no point using range patition.

All the remaining partition method don't help with optimize the performance of parallel search, thus we still pick the simplest one -- round-robin which has the lowest time complexity and ensure load balance.

As for the search method, binary search is not quite compatitable with a range query, which means the mechanism don't reduce search time complexity and can even lead to confusing output.Thus this time we pick just the linear seach.

In [0]:
#here starts task1 part2
def linear_seach(data,key):
    """
    Perform linear search on given dataset
    
    Parameters:
    dat: the dataset to be searched
    key: the key(can be a range) used for searching
    
    Return:
    result: a tuple containing the index of the matched record and the query result
    """
    position = -1
    found = None
    result = []
    for record in data:
        if int(record[-1]) in range(key[0],key[1]):
            found = record[:2] + [record[-3]]
            position = data.index(record)
            result.append(found)
    return result

def parallel_search_temperature(data,query,n_processor):
    """
    A method doing parallel search on a given dataset ,
    when given a search clue like a single key or a range for certain column value
    
    Parameters:
    data: the dataset to be searched, which is a list
    query: a query record
    n_processer: the number of processor to parallize the search job
    
    Return:
    results: the list of all search results in all processors
    """
    results = [['Latitude','Longitude','Confidence']]
    pool = Pool(processes=n_processor)
    datasets = rr_partition(data, n_processor)
    for partition in datasets:
        result = pool.apply_async(linear_seach, args=(partition,query))
        output = result.get()
        results += output
    return results

In [0]:
#test function
parallel_search_temperature(fireData,[65,100],2)[:10]

[['Latitude', 'Longitude', 'Confidence'],
 ['-37.966', '145.051', '78'],
 ['-37.875', '142.51', '93'],
 ['-37.613', '149.305', '95'],
 ['-37.624', '149.314', '90'],
 ['-37.95', '142.366', '92'],
 ['-37.634', '149.237', '100'],
 ['-37.6', '149.325', '99'],
 ['-37.609', '149.32', '99'],
 ['-37.862', '144.175', '87']]

## Task 2: Parallel Join
### 1.
Write an algorithm to find `surface temperature (°C)`, `air temperature (°C)`, `relative
humidity` and `maximum wind speed`.Justify your choice of the data partition technique
and join technique you have used.

**Justification**:

Round-Robin Data Partitioning + Divide and Droadcast-Based Parallel Join Algorithm (Hash-Based Join Algorithm).

The number of fire records for each month is different. The total of records is 2668 but most of the records were collected on April (1434 recored) and May (673 records). Due to the serious uneven problem, round-robin data parititionning would distribute all the records evenly for each processor before parallel join process.

Divide and broadcast-based parallel join algorithm would be preferable for the join process as the smaller table (climateData) would be broadcasted and the bigger one (fireData) would be divided evenly by the round-robin into several small parts. After this, each small part would be for probing and the climateData table would be for hashing, since we assume we only use 4 processors and each small part then is still larger than the climateDate (only 366 records). The reason why we skipped the disjoint partitioning-based join algorithm for this task is both hash data partitioning and range data partitioning cause some issues. Firstly, the hash-data partitioning would lead to large hash table because the hash-key would be the date attribute. Secondly, range data partitioning would cause serve skew problem (uneven problem) no matter how we distribute the range (based on seasons or just the month or day which can cut the whole table into two relatively even parts).

In [0]:
import hashlib
def H(r, index):
    """
    We define a hash function 'H' that is used in the hashing process 
    by using the bulit-in MD5 Hash inside the hashlib.
    
    Arguments:
    r -- a record where hashing will be applied on its attribute
    index - the value in the index of the recored
    
    Return:
    result -- the hash digest of the record r
    """
    aString = r[index]
    hash_value = hashlib.md5(aString.encode())
    return (hash_value.hexdigest())

In [0]:
def HB_join(T1, T2):
    """
    Perform the hash-based join algorithm
    The join attribute is the date attribute in the input tables T1 & T2
    
    Arguments:
    T1 & T2 --Tables to be joined
    
    Return:
    result -- the joined table
    """
    
    result = []
    dic = {}
    for s in T2:
        s_key = H(s, 1) 
        if s_key in dic:
            dic[s_key].append(s)
        else:
            dic[s_key] = [s]
    for r in T1:
        r_key = H(r, 6)
        if r_key in dic:
            d = dic[r_key]
            for index, element in enumerate(d):
                if r[6] == element[1]:
                    result.append(",".join([r[7], element[2], element[3], element[5]])) 
    return result
  
# Divide and Droadcast-Based Parallel Join
# Include this package for parallel processing
import multiprocessing as mp

def DDP_join(T1, T2, n_processor):
    
    result = []
    # use round robin data paritioninig to divide the larger table (fireData)
    T1_subsets = rr_partition(T1, n_processor)
    pool = mp.Pool(processes = n_processor)
    for t1 in T1_subsets:
        result.append(pool.apply(HB_join, [t1, T2]))
    return result

In [0]:
# test output
n_processor = 4
#for more clear output
result = ['surf temp, air temp,realt humid, max windspeed'] + DDP_join(fireData, climateData, n_processor)
result

['surf temp, air temp,realt humid, max windspeed',
 ['68,28,58.3,15.9',
  '54,17,50.4,16.9',
  '33,18,53.7,13',
  '75,18,53.7,13',
  '60,18,53.7,13',
  '64,18,53.7,13',
  '38,18,52,14',
  '44,19,55.3,12',
  '54,17,53.6,15',
  '54,15,48.1,12',
  '47,26,61,15',
  '54,26,61,15',
  '33,26,61,15',
  '55,26,61,15',
  '50,26,61,15',
  '86,26,61,15',
  '63,26,61,15',
  '55,26,61,15',
  '55,28,56.7,16.9',
  '44,28,56.7,16.9',
  '60,24,56.8,9.9',
  '55,24,56.8,9.9',
  '65,28,48.3,22',
  '53,24,51.9,13',
  '53,18,53.8,14',
  '69,18,53.8,14',
  '60,18,55.6,12',
  '59,14,50.9,13',
  '41,14,50.9,13',
  '48,12,46.6,14',
  '62,13,43.3,14',
  '60,22,46.9,19',
  '48,13,50.4,14',
  '38,14,49.5,12',
  '67,12,47.3,14',
  '49,26,52.2,16.9',
  '56,26,52.2,16.9',
  '68,22,46.8,16.9',
  '87,13,50.7,13',
  '77,13,42.5,15',
  '42,17,46.4,20',
  '46,19,50.3,11.1',
  '33,13,46.2,12',
  '43,13,46.2,12',
  '49,13,46.2,12',
  '50,13,46.2,12',
  '39,13,46.2,12',
  '50,11,44.2,9.9',
  '40,11,44.2,9.9',
  '53,12,47.9,11

### 2.
Write an algorithm to find `datetime`, `air temperature (°C)`, `surface temperature (°C)` and confidence when the `confidence ` is between 80 and 100. Justify your choice of the data
partition technique and join technique you have used.

**Justification**:

Round-Robin Data Partitioning + Divide and Droadcast-Based Parallel Join Algorithm (Hash-Based Join Algorithm)

Our group's justification for the date partitioning and join algorithm are the same to the task 2.1. For the search part, we simply use a linear search to perform the searching process serially based the join result. One reason is that the joint result is not sorted or not in order, which is not suitable for binary search. Another reason is performing parallel searching requires data partitioning again. Obviously, the complexity of parallel search would be increased if we partition the joint result again then perform linear search parallelly, compared to a simple serially linear search.

In [0]:
def HB_join2(T1, T2):
    """
    Perform the hash-based join algorithm
    The join attribute is the date attribute in the input tables T1 & T2
    
    Arguments:
    T1 & T2 --Tables to be joined
    
    Return:
    result -- the joined table
    """
    
    result = []
    dic = {}
    #hash the climateDate table
    for s in T2:
        s_key = H(s, 1) 
        if s_key in dic:
            dic[s_key].append(s)
        else:
            dic[s_key] = [s]
            
    # probe the small part of fireDate
    for r in T1:
        r_key = H(r, 6)
        if r_key in dic:
            d = dic[r_key]
            for index, element in enumerate(d):
                if r[6] == element[1]:
                    result.append(",".join([r[3], element[2], r[7], r[5]])) 
    return result
  
# Divide and Droadcast-Based Parallel Join
# Include this package for parallel processing
import multiprocessing as mp

def DDP_join2(T1, T2, n_processor):
    
    result = []
    T1_subsets = rr_partition(T1, n_processor)
    pool = mp.Pool(processes = n_processor)
    # Apply local join for each processor
    for t1 in T1_subsets:
        result.append(pool.apply(HB_join2, [t1, T2]))
     # Select records based on the condition where confidence in range [80, 100]
    final_result = []
    for index, element in enumerate(result):
        for i in range(len(element)):
            #seperate each column by comma in order to get the confidence 
            a = [x.strip() for x in element[i].split(',')]
            b = int(a[3])
            if (b >=80 and b <= 100):
                final_result.append(element[i])   
    return final_result

In [0]:
n_processor = 4
result = ['datetime, air temp, surf temp, confidence'] + DDP_join2(fireData, climateData, n_processor)
result

['datetime, air temp, surf temp, confidence',
 '2017-12-25T04:29:08,17,54,80',
 '2017-12-16T04:34:58,18,75,95',
 '2017-12-16T00:20:53,18,64,82',
 '2017-12-09T00:14:34,17,54,81',
 '2017-12-08T13:11:44,15,54,100',
 '2017-11-30T15:38:35,26,54,100',
 '2017-11-30T12:22:15,26,55,100',
 '2017-11-30T04:34:57,26,86,99',
 '2017-11-30T00:20:53,26,63,87',
 '2017-11-30T00:20:53,26,55,81',
 '2017-11-29T13:17:24,28,55,100',
 '2017-11-23T04:28:59,24,60,84',
 '2017-11-14T04:35:04,28,65,87',
 '2017-11-13T03:52:14,24,53,81',
 '2017-11-12T00:33:15,18,69,89',
 '2017-11-11T04:04:25,18,60,86',
 '2017-11-09T04:16:48,14,59,85',
 '2017-11-05T04:41:19,13,62,89',
 '2017-10-27T04:47:31,22,60,86',
 '2017-10-26T13:30:31,13,48,100',
 '2017-10-21T01:09:54,12,67,92',
 '2017-10-18T00:39:02,26,56,82',
 '2017-10-17T04:10:45,22,68,91',
 '2017-10-15T04:23:01,13,87,100',
 '2017-10-10T04:04:19,13,77,96',
 '2017-10-07T12:59:33,17,42,91',
 '2017-10-04T04:41:02,19,46,80',
 '2017-10-03T03:58:15,13,49,82',
 '2017-10-02T04:53:10,11

## Task 3  Parallel Sort
Write an algorithm to sort fire data based on `surface temperature(°C)` in a ascending order. Justify your choice of the data partition technique and sorting technique you have used.


**Justification**:

Merge sort(internal) + serial sort(based on sort-merge) + paralle-merge-all

Since each record in the firedata is quite similar, which means the processing time for each of them can be regarded as the same, thus to make best use of all the processors in the parallized process, it is best to consider round-robin as a data partition method which is the one that can maintains best load-balancing as well as smallest complexity in both terms of time and space as a partition method.

Here I decide to pick `merge sort` as the internal sorting algorithm. For `merge sort` is an identical comparison-based sorting algorithm whose time complexity is O(nlgn) while quick-sort is just quick in implementation but the worst case can be n^2. And both of the two sorting algorithms has the same time complexity since they all use divide and conquer and are divided in a binary way.

Thus in term parallel sorting I am using the merge-sort as the internal sorting method, and the `merge-all` as the technique to merge the sorted list of all processors since the dataset is relatively small thus the burden of the final processor doing the merge-all won't be too large.


In [0]:
#here is the internal sorting method --'mergesort'
def merge(a,b):
    """ Function to merge two arrays which has length shorter equal 1 in a sorted way
    """
    c = []
    while len(a) != 0 and len(b) != 0:
        if int(a[0][-1]) < int(b[0][-1]):
            c.append(a[0])
            a.remove(a[0])
        else:
            c.append(b[0])
            b.remove(b[0])
    if len(a) == 0:
        c += b
    else:
        c += a
    return c

def mergesort(x):
    """ Function to sort an array using merge sort algorithm """
    if len(x) == 0 or len(x) == 1:
        return x
    else:
        middle = len(x)//2
        a = mergesort(x[:middle])
        b = mergesort(x[middle:])
        return merge(a,b)

In [0]:
# Let's first look at 'k-way merging algorithm' that will be used 
# to merge sub-record sets in our external sorting algorithm.
import sys

# Find the smallest record
def find_min(records):    
    """ 
    Find the smallest record
    
    Arguments:
    records -- the input record set

    Return:
    result -- the smallest record's index
    """
    try:
        m = int(records[0][-1])# records = [[],[],[]]
    except:
        print(records)
    index = 0
    for i in range(len(records)):
        if int(records[i][-1]) < m:  
            index = i
            m = int(records[i][-1])
    return index

def k_way_merge(record_sets):#records_sets = [[[r1],[r2]],[[r3],[r4]],[[r5],[r6]]]
    """ 
    K-way merging algorithm
    
    Arguments:
    record_sets -- the set of mulitple sorted sub-record sets

    Return:
    result -- the sorted and merged record set
    """
    
    # indexes will keep the indexes of sorted records in the given buffers
    indexes = []
    for x in record_sets:
        indexes.append(0) # initialisation with 0

    # final result will be stored in this variable
    result = []  
    
    # the merging unit (i.e. # of the given buffers)
    sub = []
    
    while(True):
        sub = [] # initialise the merging unit
        
        # This loop gets the current position of every buffer
        for i in range(len(record_sets)):
            if(indexes[i] >= len(record_sets[i])):
                sub.append([sys.maxsize])
            else:
                sub.append(record_sets[i][indexes[i]])  
                
        # find the smallest record 
        smallest = find_min(sub)
        # if we only have sys.maxsize on the tuple, we reached the end of every record set
        if(sub[smallest] == [sys.maxsize]):
            break

        # This record is the next on the merged list
        result.append(record_sets[smallest][indexes[smallest]])
        indexes[smallest] +=1
   
    return result

In [0]:
# The serial sorting method
def serial_sorting(dataset, buffer_size):
    """
    Perform a serial external sorting method based on sort-merge
    The buffer size determines the size of eac sub-record set

    Arguments:
    dataset -- the entire record set to be sorted
    buffer_size -- the buffer size determining the size of each sub-record set

    Return:
    result -- the sorted record set
    """
    
    if (buffer_size <= 2):
        print("Error: buffer size should be greater than 2")
        return
    
    result = []

    ### START CODE HERE ### 
    
    # --- Sort Phase ---
    sorted_set = []
    
    # Read buffer_size pages at a time into memory and
    # sort them, and write out a sub-record set (i.e. variable: subset)
    start_pos = 0
    N = len(dataset)
    while True:
        if ((N - start_pos) > buffer_size):
            # read B-records from the input, where B = buffer_size
            subset = dataset[start_pos:start_pos + buffer_size] 
            # sort the subset (using qucksort defined above)
            sorted_subset = mergesort(subset) 
            sorted_set.append(sorted_subset)
            start_pos += buffer_size
        else:
            # read the last B-records from the input, where B is less than buffer_size
            subset = dataset[start_pos:] 
            # sort the subset (using qucksort defined above)
            sorted_subset = mergesort(subset) 
            sorted_set.append(sorted_subset)
            break
    
    # --- Merge Phase ---
    merge_buffer_size = buffer_size - 1
    dataset = sorted_set
    while True:
        merged_set = []

        N = len(dataset)
        start_pos = 0
        while True:
            if ((N - start_pos) > merge_buffer_size): 
                # read C-record sets from the merged record sets, where C = merge_buffer_size
                subset = dataset[start_pos:start_pos + merge_buffer_size]
                merged_set.append(k_way_merge(subset)) # merge lists in subset
                start_pos += merge_buffer_size
            else:
                # read C-record sets from the merged sets, where C is less than merge_buffer_size
                subset = dataset[start_pos:]
                merged_set.append(k_way_merge(subset)) # merge lists in subset
                break

        dataset = merged_set
        if (len(dataset) <= 1): # if the size of merged record set is 1, then stop 
            result = merged_set
            break
    ### END CODE HERE ###
    
    return result

In [0]:
def parallel_merge_all_sorting(dataset, n_processor, buffer_size):
    """
    Perform a parallel merge-all sorting method

    Arguments:
    dataset -- entire record set to be sorted
    n_processor -- number of parallel processors
    buffer_size -- buffer size determining the size of each sub-record set

    Return:
    result -- the merged record set
    """
    if (buffer_size <= 2):
        print("Error: buffer size should be greater than 2")
        return
    
    result = []

    ### START CODE HERE ### 
    
    # Pre-requisite: Perform data partitioning using round-robin partitioning
    subsets = rr_partition(dataset, n_processor)
    
    # Pool: a Python method enabling parallel processing. 
    pool = mp.Pool(processes = n_processor)

    # ----- Sort phase -----
    sorted_set = []
    for s in subsets:
        # call the serial_sorting method above
        sorted_set.append(*pool.apply(serial_sorting, [s, buffer_size]))
    pool.close()
    
    # ---- Final merge phase ----
    result = k_way_merge(sorted_set)
    ### END CODE HERE ###
    
    return result

In [0]:
# test output of parallel sort
parallel_merge_all_sorting(fireData,4,4)

[['-37.886',
  '147.207',
  '302',
  '2017-07-02T04:28:42',
  '10.7',
  '50',
  '2017-07-02',
  '28'],
 ['-37.886',
  '147.207',
  '302',
  '2017-07-02T04:28:42',
  '10.7',
  '50',
  '2017-07-02',
  '28'],
 ['-36.943',
  '143.286',
  '302.7',
  '2017-11-11T15:08:00',
  '18.8',
  '51',
  '2017-11-11',
  '29'],
 ['-37.062',
  '141.373',
  '303.1',
  '2017-07-01T13:11:41',
  '16.1',
  '53',
  '2017-07-01',
  '29'],
 ['-37.466',
  '148.1',
  '302.2',
  '2017-10-02T23:44:31',
  '10.9',
  '50',
  '2017-10-02',
  '29'],
 ['-37.062',
  '141.373',
  '303.1',
  '2017-07-01T13:11:41',
  '16.1',
  '53',
  '2017-07-01',
  '29'],
 ['-37.227',
  '141.146',
  '305.1',
  '2017-10-03T01:22:44',
  '41.2',
  '54',
  '2017-10-03',
  '31'],
 ['-37.38',
  '149.334',
  '304.5',
  '2017-11-30T15:38:32',
  '14.1',
  '61',
  '2017-11-30',
  '31'],
 ['-36.779',
  '146.108',
  '305.3',
  '2017-07-01T03:46:08',
  '25.7',
  '61',
  '2017-07-01',
  '32'],
 ['-35.646',
  '142.282',
  '305.6',
  '2017-12-24T13:12:01',


## Task 4: Parallel Group-By
### 1.

Write an algorithm to get the number of fire in each day. You are required to only display total number of fire and the date in the output. Justify your choice of the data partition technique if any.

**Justification**:

Round Robin Data Partitioning + Merge-All Group By

The reason why we use round robin data partitioning for this task is the same to task2.1 and task 2.2. That is good load balance (even distribution). Hash data partitioning leads to large hash table due to the date (hash key) and range data partitioning causes uneven problem (severe skew issue). 

For the group by algorithm, we use traditional group by method (merge all group by). This is because the group by attribute is date and redistributing local aggregation results based on date does not change a lot. If we assume four processors would be used, then the parallel time spent on redistribution would be around 25% of the time spent by one processor who performs this redistribution serially. But after that, the global aggregation still requires each processor to do the aggregation parallelly and this would take around the same time to access the redistributed data set. The above explanation about redistribution is based on the current data (fireData), which is not a large data set compared to other data sets in reality. 

In [0]:
def local_groupby(dataset, index_groupby):
    """
     Peform a local groupby method
     
     Arguments:
     dataset -- entire record set to be merged
     index_groupby -- the index of attribute that would be used to group by
     
     Return:
     return -- the aggregated record set according to the group_by attribute index
    """
  
    dict = {}
    for index, record in enumerate(dataset):
        key = record[index_groupby]
        if key not in dict:
            dict[key] = 0
        # the dict[key] would be used as counter
        dict[key] += 1
    
    return dict

def parallel_merge_all_groupby(dataset):
    """
    Perform a parallel merge_all groupby method

    Arguments:
    dataset -- entire record set to be merged

    Return:
    result -- the aggregated record dictionary according to the group_by attribute index
    """
    
  
    # Use round robin date partition to divide the dataset 
    subsets = rr_partition(fireData,3)
    # Define the number of parallel processors: the number of sub-datasets.
    n_processor = len(subsets)

    # Pool: a Python method enabling parallel processing. 
    pool = mp.Pool(processes = n_processor)

    # ----- Local aggregation step -----
    local_result = []
    for s in subsets:
        # call the local aggregation method
        local_result.append(pool.apply(local_groupby, [s, 6])) #[{'date1': sum},{'date2': sum},{'date1': sum}.....]
    pool.close()

    # ---- Global aggregation step ----e
    result = {} #global result {'date1': sum, 'date2':sum}
    for index, element in enumerate(local_result):
        for key, val in element.items():
            if key not in result:
                result[key] = 0
            result[key] += val    
    
    return result

In [0]:
parallel_merge_all_groupby (fireData)

{'2017-03-06': 2,
 '2017-03-07': 1,
 '2017-03-08': 2,
 '2017-03-09': 3,
 '2017-03-10': 8,
 '2017-03-12': 5,
 '2017-03-13': 2,
 '2017-03-14': 10,
 '2017-03-15': 7,
 '2017-03-17': 6,
 '2017-03-18': 3,
 '2017-03-19': 21,
 '2017-03-24': 2,
 '2017-03-25': 13,
 '2017-03-26': 17,
 '2017-03-28': 54,
 '2017-03-29': 1,
 '2017-03-31': 22,
 '2017-04-01': 7,
 '2017-04-02': 5,
 '2017-04-03': 72,
 '2017-04-04': 89,
 '2017-04-05': 49,
 '2017-04-06': 118,
 '2017-04-07': 39,
 '2017-04-08': 20,
 '2017-04-11': 24,
 '2017-04-12': 69,
 '2017-04-13': 357,
 '2017-04-14': 18,
 '2017-04-15': 69,
 '2017-04-16': 18,
 '2017-04-17': 38,
 '2017-04-18': 325,
 '2017-04-19': 50,
 '2017-04-20': 31,
 '2017-04-22': 2,
 '2017-04-23': 19,
 '2017-04-24': 8,
 '2017-04-25': 3,
 '2017-04-26': 1,
 '2017-04-29': 3,
 '2017-05-01': 20,
 '2017-05-02': 10,
 '2017-05-03': 64,
 '2017-05-04': 135,
 '2017-05-05': 31,
 '2017-05-06': 17,
 '2017-05-07': 3,
 '2017-05-08': 24,
 '2017-05-09': 13,
 '2017-05-10': 114,
 '2017-05-11': 19,
 '2017-0

### 2.
Parallel Group-By Write an algorithm to find the **average** `surface temperature (°C)` for each day . You are required to only display average surface temperature (°C) and the date in the output. Justify your choice of the data partition technique if any.

**Justification**:

Round Robin Data Partitioning + Merge-All Group By

The reasons for round robin data partionining and merge-all group by being used here are the same to the above one.

In [0]:
def local_groupby2(dataset, index_groupby, index_target):
    """
    Peform a local groupby method
    
    Arguments:
    dataset -- entire record set to be merged
    index_groupby -- the index of attribute that would be used to group by
    index_target -- the index of attribute whose values would be used to sum
    
    Return:
    return -- the aggregated record set according to the group_by attribute index
    """
  
    dict = {}
    for index, record in enumerate(dataset):
        key = record[index_groupby]
        #Get the target value from the record
        val = int(record[index_target])
        if key not in dict:
            dict[key] = [0,0]
        dict[key][0] += val
        dict[key][1] += 1
    return dict #{'station': [sum, counter],'station': [sum, counter]}


def parallel_merge_all_groupby2(dataset):
    """
    Perform a parallel merge_all groupby method

    Arguments:
    dataset -- entire record set to be merged

    Return:
    final_result -- the aggregated record dictionary according to the group_by attribute index
    """
    
  
    # Use round robin date partition to divide the dataset 
    subsets = rr_partition(dataset, 4)
    # Define the number of parallel processors: the number of sub-datasets.
    n_processor = len(subsets)

    # Pool: a Python method enabling parallel processing. 
    pool = mp.Pool(processes = n_processor)

    # ----- Local aggregation step -----
    local_result = []
    for s in subsets:
        # call the local aggregation method
        local_result.append(pool.apply(local_groupby2, [s, 6, 7])) #[{'date1': [sum, counter]},{'date2': [sum, counter]},{'date1': [sum, counter]}.....]
    pool.close()

    # ---- Global aggregation step ----
    result = {} #global result #[{'date1': [sum, counter]},{'date2': [sum, counter]}.....]
    for index, element in enumerate(local_result):
        for key, val in element.items():
            if key not in result:
                result[key]= [0,0]
            result[key][0] += val[0]
            result[key][1] += val[1]

    #----Final Avg Calculation-------
    final_result = {} #final avg result {'date1': avg, 'date2': avg}
    for key, val in result.items():
        final_result[key] = val[0] / val[1]
    return final_result

In [0]:
parallel_merge_all_groupby2(fireData)

{'2017-03-06': 60,
 '2017-03-07': 64,
 '2017-03-08': 51,
 '2017-03-09': 46,
 '2017-03-10': 69,
 '2017-03-12': 88,
 '2017-03-13': 38,
 '2017-03-14': 65,
 '2017-03-15': 46,
 '2017-03-17': 59,
 '2017-03-18': 79,
 '2017-03-19': 65,
 '2017-03-24': 49,
 '2017-03-25': 66,
 '2017-03-26': 56,
 '2017-03-28': 60,
 '2017-03-29': 51,
 '2017-03-31': 48,
 '2017-04-01': 46,
 '2017-04-02': 45,
 '2017-04-03': 58,
 '2017-04-04': 62,
 '2017-04-05': 53,
 '2017-04-06': 61,
 '2017-04-07': 50,
 '2017-04-08': 60,
 '2017-04-11': 46,
 '2017-04-12': 52,
 '2017-04-13': 58,
 '2017-04-14': 61,
 '2017-04-15': 59,
 '2017-04-16': 48,
 '2017-04-17': 50,
 '2017-04-18': 53,
 '2017-04-19': 54,
 '2017-04-20': 56,
 '2017-04-22': 54,
 '2017-04-23': 53,
 '2017-04-24': 59,
 '2017-04-25': 48,
 '2017-04-26': 34,
 '2017-04-29': 63,
 '2017-05-01': 68,
 '2017-05-02': 55,
 '2017-05-03': 56,
 '2017-05-04': 56,
 '2017-05-05': 51,
 '2017-05-06': 57,
 '2017-05-07': 50,
 '2017-05-08': 56,
 '2017-05-09': 42,
 '2017-05-10': 52,
 '2017-05-11

## Task 5: Parallel Group-By Join
Write an algorithm to find the average surface temperature (°C) for each weather station.You are required to only display average surface temperature (°C) and the station in the
output. Justify your choice of the data partition and join technique.

**Justification**:

Round-Robin Data Partitioning + Divide and Droadcast-Based Parallel Join Algorithm (Hash-Based Join Algorithm) + Merge-All Group By

The data parition technique is the round-robin data partitioning here considering the better load balance, which is the same to the task2's explanation. 
The reason for merge-all group by technique for this task is the same to task 4.1's.

In [0]:
def HB_join3(T1, T2):
    """
    Perform the hash-based join algorithm
    The join attribute is the date attribute in the input tables T1 & T2
    
    Arguments:
    T1 & T2 --Tables to be joined
    
    Return:
    result -- the joined table
    """
    
    result = []
    dic = {}
    for s in T2:
        s_key = H(s, 1) 
        if s_key in dic:
            dic[s_key].append(s)
        else:
            dic[s_key] = [s]
    for r in T1:
        r_key = H(r, 6)
        if r_key in dic:
            d = dic[r_key]
            for index, element in enumerate(d):
                if r[6] == element[1]:
                    result.append(",".join([r[7], element[0]]))
    return result
  
import multiprocessing as mp
    
def DDP_join3(T1, T2, n_processor):
    
    result = []
    # Use round robin date partition to divide the T1 (larger table-fireData) 
    T1_subsets = rr_partition(T1, n_processor)
    pool = mp.Pool(processes = n_processor)
    # Perform hash-based join parallely 
    for t1 in T1_subsets:
        result.append(pool.apply(HB_join3, [t1, T2]))
    return result

In [0]:
def parallel_merge_all_groupby3(dataset):
    """
    Perform a parallel merge_all groupby method

    Arguments:
    dataset -- entire record set to be merged

    Return:
    final_result -- the aggregated record dictionary according to the group_by attribute index
    """
     
    # Use round robin date partition to divide the dataset 
    subsets =  rr_partition(dataset, 4)
    # Define the number of parallel processors: the number of sub-datasets.
    n_processor = len(subsets)

    # Pool: a Python method enabling parallel processing. 
    pool = mp.Pool(processes = n_processor)

    # ----- Local aggregation step -----
    local_result = []
    for s in subsets:
        # call the local aggregation method
        local_result.append(pool.apply(local_groupby2, [s, 1, 0])) #[{'station1': [sum, counter]},{'station2': [sum, counter]},{'station1': [sum, counter]}.....]
    pool.close()

    # ---- Global aggregation step ----
    result = {} #global result {'station1': [sum, counter], 'station2': [sum, counter]}
    for index, element in enumerate(local_result):
        for key, val in element.items():
            if key not in result:
                result[key]= [0,0]
            result[key][0] += val[0]
            result[key][1] += val[1]

    #----Final Avg Calculation-------
    final_result = {} #final avg result {'station1': avg, 'station2':avg}
    for key, val in result.items():
         final_result[key] = val[0] / val[1]
    return final_result

In [0]:
def format_join_result(join_result):
    """
    Format the join result into lists of list
    
    Arguments:
    join_result --the result would to be formated
    
    Return:
    result -- the formated result
    """
    formated_result = []
    for s in join_result:
        for r in s:
            a = [x.strip() for x in r.split(',')]
            b = int(a[0])
            formated_result.append([b, a[1]])
      
    return formated_result    

In [0]:
#Perform the join function 
join_result =  DDP_join3(fireData, climateData, 2)

# Format the join result according to "lists of list" format
formated_join_result = format_join_result(join_result)

#Perform the parallel merge all groupby
parallel_merge_all_groupby3(formated_join_result)

{'948701': 56, '948702': 52}