# FIT5148 - Distributed Databases and Big Data

# Assignment 1 - Solution Workbook


**Instructions:**
- You will be using Python 3.
- Read the assignment instruction carefully and implement the algorithms in this workbook. 
- You can use the datasets fireData and climateData (provided below) if you are aiming for Credit Task.
- For Distinction and High Distinction tasks, you are required to read the files FireData.csv and ClimateData.CSV provided with the assignment programatically and prepare the data in the correct format so that it can be used in your algorithm. 
- You can introduce new cells as necessary.

**Your details**
- Name: Moon Beyong Kim
- Student ID: 26389126

- Name: Hun Tae Lee
- Student ID: 22568735

Let's get started!

The concepts and definitions used to accomplish this assignment refer to our tutorial activities and a reference textbook below;

- High Peoformance Parallel Database Processing and Grid Databases (Wiley 2008) by Taniar, Leung, Rahayu,, and Goel.


## Import Libraries and Read Files

In [1]:
# Using pandas library for output format use
import pandas as pd 
# Using csv library to get a csv file format
import csv 
# Using Pool library for multi-processcing 
from multiprocessing import Pool
# Read the 'ClimateData.csv file
with open('ClimateData.csv','r') as f:
    reader = csv.reader(f)
    climateData = list(reader)
# Delete the first row of the file
del climateData[0]
# Read the 'FireData.csv file
with open('FireData.csv','r') as f:
    reader = csv.reader(f)
    fireData = list(reader)
# Delete the first row of the file
del fireData[0]

## Task 1: Parallel Search 

### Question 1
*** Write an algorithm to search climate data for the records on 15th December 2017. Justify your choice of the data partition technique and search technique you have used. ***
- It is important to know how searches are performed sequentially before performing a parallel search. The serial search algorithm and data partitioning are the basis for parallel search algorithms.
- To find the result of this problem, we first paritioned the climate data to distribute data over a number of processing elements. we use the "Round-robin(RR) data partitioning" as the paritioning method. The RR partitioning method is how each record is sequentially assigned to the processing element. The reason for choosing the RR partitionig method is that the RR partition has the advantage of evenly distributed data. The aim of parallel processing, especially parallel database processing, is to achieve load balancing to reduce the elapsed time of a job, and this data partitioning supports this goal.
- We secondly searched the climate data through the "binary search". To use binary search, the list must already be completely aligned. Binary search starts by comparing the key to the middle entry in a sorted list of elements. If it matches, it returns the index of this element. Otherwise, processing will continue using the lower or upper half of the table (depending on the key value). In essence, only one comparison removes half of the table. We use the binary search because of its advantages. Firstly, binary search is much faster when compared to linear search (which check each element of the array from the first). Linear search takes an average N/2 comparison (Where N is the number of elements in the array) and the worst N comparison. Binary search uses average and worst case log2 (N) comparisons. So for a million elements, the linear search will average 50,000 comparisons, while the binary search will take 20. Secodly it is a fairly simple algorithm. Moreover, the given climate data is sorted list and static lists.
- We finally use a "Parallel Search Exact" as a parallel search algorithm. We use the above binary search function as a location search method. As a local comparison method, we assume that we stop when a match is found for brevity. We use the RR data partitioning we built above with the data partitioning method.

In [2]:
# Print_result_format_for_climate function
def print_result_format_for_climate(data):
    """
    This is function for printing result with proper format
    
    Arguments:
    data -- input result
    
    Return:
    result -- data printing in proper format
    """
    
    print("Key: " + str(data[0][0]))
    print("Station: " + data[0][1][0])
    print("Data: " + data[0][1][1])
    print("Air Temp: " + data[0][1][2] + " Celcius")
    print("Relative Humidity: " + data[0][1][3] + "%")
    print("Wind Speed: "  + data[0][1][4] + " knots")
    print("Max wind Speed: " + data[0][1][5]+ " knots")
    print("MAX: " + data[0][1][6])
    print("MIN: " + data[0][1][7])
    print("Precipitation: " + data[0][1][8])
    
    

In [3]:
#### Task 1 - Question 1 ####

# Round-robin data partitioning function
# Need round robin partitioning
def rr_partition_for_climate_by_date(data, n):
    """
    Perform data partitioning on data
    
    Arguments:
    data -- an input dataset which is a list
    n -- the number of processors
    
    Return:
    result -- the partitioned subset of D
    """
    
    result = []
    for i in range(n):
        result.append([])
    
    #Calculate the number of elements to be allocated to each bin
    n_bin = len(data)/n
    
    #For each bin, perform the following:
    for index, element in enumerate(data):
        #Calculate the index of the bin that the current data point will be assigned
        index_bin = (int) (index % n)

        result[index_bin].append(element)
    
    return result

# Binary search function
# Need binary search
def binary_search_for_climate_by_date(data, key):
    """
    Perform binary search on data for the given key
    
    Arguments:
    data -- an input dataset which is a list
    key -- an query record
    
    Return:
    result -- the position of searched record
    """
    
    matched_record = None
    position = -1 #not found position
    
    lower = 0
    middle = 0
    upper = len(data)-1
    
    while (lower <= upper):
        #calculate middle: the half of lower and upper
        middle = int((lower + upper)/2)
        
        if(data[middle][1] > key): # 
            upper = middle-1
        elif(data[middle][1] < key): #
            lower = middle+1
        elif(data[middle][1] == key): #
            matched_record = data[middle]
            position = middle
            break
            
    return position, matched_record

# Parallel searching algorithm for exact match
def parallel_search_exact(data, query, n_processor, m_partition, m_search):
    """
    Perform parallel search for exact match on data for the given key
    for task1 question 1, partitioning is round-robin.
    Arguments:
    data -- an input dataset which is a list
    query -- a query record
    n_processor -- the number of parallel processor
    m_partition -- a data partitining method
    m_search -- a search method
    
    Return:
    results -- the matched record information
    """
    
    results = []
    
    # pool: a pyhon method enabling parallel processing.
    # We need to set the number of processes to n_processsor
    # Which means that the Pool class will allow 'n_processor' processes
    # running at the same time
    pool = Pool(processes=n_processor)
    
    # For round-robin partitioning method
    # Perform data partitioning first
    DD = m_partition(data, n_processor)
    for d in DD: # Perform parallel search on all data prartitions
        result = pool.apply(m_search,[d,query])
        
        # will only append the result found, not not-found
        if(result[0]!=-1):
            results.append(result) 
    
    return results

# Output of using the parallel_search_exact function, round-robin partition, binary_search
ans = parallel_search_exact(climateData, '2017-12-15',10,rr_partition_for_climate_by_date, binary_search_for_climate_by_date)
# Display the result of task1_1
print_result_format_for_climate(ans)



Key: 34
Station: 948702
Data: 2017-12-15
Air Temp: 18 Celcius
Relative Humidity: 52%
Wind Speed: 7.1 knots
Max wind Speed: 14 knots
MAX:    74.5*
MIN: 53.1
Precipitation:  0.00I


### Question 2
*** Write an algorithm to find the latitude, longitude and confidence when the surface temperature (Celcius) was between 65 (Celcius) and 100 (Celcius). Justify your choice of the data partition thechnique and search technique you have used. ***
- It is important to know how searches are performed sequentially before performing a parallel search. The serial search algorithm and data partitioning are the basis for parallel search algorithms.
- To find the result of this problem, we first paritioned the fire data to distribute data over a number of processing elements. we use the "Hash data partitioning" as the paritioning method. To make paritions more meaningful by grouping records with the same semantics or capabilities, partitions must be based on specific attributes. One type of attribute-based partitioning is hash partitioning, where the hash function is applied. The result of this hash function determines which processor the record will be placed into. As a result, records within a partition have the same hash value. The hash function is best suited for exact match searches based on partitioing properties that allow direct access to the processor containing the desired record, based on attributes that are identical to exact searches. In this case, only the selected processing element is activated to hold the candidate record, but the total cost is reduced because no other processing elements are required to work. Processing elements that are idle during this particular operation can be used to handle other tasks. Moreover, with the hasing algorithm, partitions will be about the same size. This method is very easy to use because the data that can be split is not really historical.
- We use a linear search as a search algorithm. Linear search is a sequential search that uses a loop to step through the array. Begin with the first element. Compares each element with the value being retrieved and stops when the value is found or the end of the array is reached. The reason for using the linear search is that it is the simplest and straightforward search method. It is easy to understand and implement. Also, we use a parallel search range as a parallel search algorithm, so we do not need to store data in any particular order.
- We use a parallel range search algorithm for range selection from the given data. To use of this algorithm, we uses the linear search algorithm and the hash partitioning method discussed above. 

In [4]:
#### Task 1 - Question 2 ####

# Define a simple hash function
def s_hash(x, n):
    """
    Define a simple hash function for demonstration
    
    Arguments:
    x -- an input record
    n -- the number of processors
    
    Return:
    result -- the hash value of x
    """

    result = x%n
    
    return result

# Hash data partitioning function
# We will use the "s_hash" function defined above to realise this partitioning
# Need hash partitioning 
def h_partition_for_fire_by_temp(data, n):
    """
    Perform hash data partitioning on data
    
    Arguments:
    data -- an input dataset which is a list
    n -- the number of processors
    
    Return:
    result -- the partitioned subsets of fireData
    """
    
    dic = {} # We will use a dictionary
    for x in data: # for each data record, perform the following
        h = s_hash(int(x[7]),n) # Get the hash key of the input
        if (h in dic.keys()): # If the key exists
            s = dic[h]
            s.add(tuple(x))
            dic[h] = s # Add the nuew input to the value set of the key
        else: # If the key does not exist
            s = set() # Create an empty value set
            s.update({tuple(x)})
            dic[h] = s # Add the value set to the key
    return dic

# Linear search function
# Need linear search
def linear_search_for_fire_by_temp(data, key):
    """
    Perform Linear search on data for the given key
    
    Arguments:
    data -- an input dataset which is a list
    key -- an query record
    
    Return:
    result -- the position of searched record
    """
    
    matched_list = []
    matched_record = None
    position = -1 # not found position
    
    for x in data:
        if int(x[7]) == key: # If value of int(x[7]) is matched with key
            matched_record = x
            position = data.index(x) # Get the index of x
            result = [position, matched_record] 
            matched_list.append(result) # Append the result value into the matched_list
    return matched_list

# Parallel searchig algorithm for range selection
# Finally do parallel_search_range
def parallel_search_range(data, query_range, n_processor):
    """
    Perform parallel search for range selection on data for the given key
    
    Arguments:
    data -- the input dataset which is a list
    query_range -- a query record in the form of a range (e.g. [65, 101])
    n_processor -- the number of parallel processors
    
    Retrun:
    results -- the matched record information
    """
    
    results = []
    
    # pool: a Python method enabling parallel processing.
    # We need to set number of processes to n_processor,
    # which means that the Pool class will only allow 'n_processor' processes
    # running at the same time.
    pool = Pool(processes = n_processor)
    
    query_s = query_range[0] #start of range
    query_e = query_range[1] #end of range
    DD = h_partition_for_fire_by_temp(fireData,10)
    for query in range(query_s, query_e):
        query_hash = s_hash(query,n_processor)
        d = list(DD[query_hash])
        result = pool.apply(linear_search_for_fire_by_temp, [d, query])
        for r in result:
            if(r[0] != -1): #if it is not not-found
                results.append(r)
                
    return results

#Getting data from fireData between temp range of 65~100
temp_result = parallel_search_range(fireData,[65,101],10)
final_result = []
#append the necessary data only from each elemennt
for x in temp_result: 
    element = []
    element.append(x[1][0])
    element.append(x[1][1])
    element.append(x[1][5])
    element.append(x[1][7])
    final_result.append(element)

#For displaying, label column for output
label = ['Latitude','Longitude','Confidence','Temperature (Celcius)']
df = pd.DataFrame.from_records(final_result,columns=label)
df

Unnamed: 0,Latitude,Longitude,Confidence,Temperature (Celcius)
0,-37.642,149.263,100,65
1,-37.4449,147.6594,89,65
2,-37.46,148.102,88,65
3,-34.3654,141.543,89,65
4,-36.1441,145.2221,89,65
5,-37.876,143.7804,89,65
6,-37.0959,143.8206,89,65
7,-37.5816,148.5862,84,65
8,-34.9023,142.0557,89,65
9,-36.4143,143.272,89,65


## Task 2: Parallel Join

### Question 1
*** Write an algorithm to find surface temperature (Celcius), air temperature (Celcius), relative humidity and maximum wind speed. Justify your choice of the data partition technique and join technique you have used. ***
- The Divide and Broadcast-Based Parallel (DDP) Join Algorithm is used. This algoritm consists of two stages: (1) data partitioning using the divide and broadcast method and (2) a local join. The hash-based join algorithm are used and the round-robin data partitioning function designed for "Parallel Search" activity is used.
- DDP join is suitable method in order to compare two equal number partitioned data, and also compare to other joining methods, it is easy to implement and use. Also we decided to use DDP for this question since it looks suitable for this case. Once Divide and Broadcast-based Partitioning is done, it is okay to use any serial join method. We used hash based join to increase the efficiency compare to other serial join methods. 

In [5]:
#### ------- Task 2 - Question 1 ------- ####

from datetime import datetime
import multiprocessing as mp

def rr_partition(data, n):
    """
    Perform data partitioning on data
    
    Arguments:
    data -- an input dataset which is a list
    n -- the number of processors
    
    Return:
    result -- the partitioned subset of D
    """
    
    result = []
    for i in range(n):
        result.append([])
    
    #Calculate the number of elements to be allocated to each bin
    n_bin = len(data)/n
    
    #For each bin, perform the following:
    for index, element in enumerate(data):
        #Calculate the index of the bin that the current data point will be assigned
        index_bin = (int) (index % n)
        result[index_bin].append(element)
    
    return result

def date_hash_for_climate(x): #will hash the date in climateData
    """
    We define a hash function 'date_hash_for_climate' that is used in the hashing process 
    works by converting stirng date value to integer value of the hashed attribute, 
    which in this case is the join attribute.
    
    Arguments:
    x -- a record where hashing will be applied on its join attribute

    Return:
    result -- the hash index of the record x
    """
    
    # the location of hasing value    
    old = x[1]
    # Convert date format
    datetimeobject = datetime.strptime(old,'%Y-%m-%d')
    new = datetimeobject.strftime('%Y%m%d')
    # Convert to integer value
    return int(new)

def date_hash_for_fire(x): #will hash the date in climateData
    """
    We define a hash function 'date_hash_for_fire' that is used in the hashing process 
    works by converting stirng date value to integer value of the hashed attribute, 
    which in this case is the join attribute.
    
    Arguments:
    x -- a record where hashing will be applied on its join attribute

    Return:
    result -- the hash index of the record x
    """
        
    # the location of hasing value 
    old = x[6]
    # Convert date format
    datetimeobject = datetime.strptime(old,'%Y-%m-%d')
    new = datetimeobject.strftime('%Y%m%d')
    # Convert to integer value
    return int(new)

def HB_join(T1,T2): #T1-climateData, T2-fireData
    """
    Perform the hash-based join algorithm.
    The join attribute is the numeric attribute in the input tables T1 & T2

    Arguments:
    T1 & T2 -- Tables to be joined

    Return:
    result -- the joined table
    """
    
    result = []
    
    # We will use a dictionary
    dic = {}
    for t1 in T1:
        # Hash the record based on join attribute value using hash function date_hash_for_climate into hash table
        t1_key = date_hash_for_climate(t1)
        if t1_key in dic:
            dic[t1_key].add(tuple(t1)) # If there is an entry
        else:
            dic[t1_key] = {tuple(t1)}
    
    # For each record in table T2 (probing)
    for t2 in T2:
        # Hash the record based on join attribute value using date_hash_for_fire
        t2_key = date_hash_for_fire(t2)
        # If an index entry is found Then
        if t2_key in dic:
            # Compare each record on this index entry with the record of table T2
            for value in dic[t2_key]:
                 # If the key is the same then put the result
                if t2[6] == value[1]:
                    result.append((t2[7], value[2], value[3], value[5], value[1]))
    
    return result

def DDP_join(T1, T2, n_processor):
    """
    Perform a divide and broadcast-based parallel join algorithms.
    The join attribute is the numeric attribute in the input tables T1 & T2

    Arguments:
    T1 & T2 -- Tables to be joined
    n_processor -- the number of parallel processors

    Return:
    result -- the joined table
    """
        
    result = []
    
    # Partition T1 into sub-tables using rr_partition().
    # The number of the sub-tables must be the equal to the n_processor
    T1_subset = rr_partition(T1,n_processor)
    
    # Pool: a Python method enabling parallel processing.
    pool = mp.Pool(processes = n_processor)
    
    for t1 in T1_subset:
        # Apply a join on each processor
        
        # Note that as we assume a shared-memory architecture, no replication
        # of the broadcast table (in this case: table T2 (smaller table) occurs.
        result.append(pool.apply(HB_join,[t1,T2]))
        
    return result

n_processor = 4
result_data = DDP_join(climateData,fireData,n_processor)

# For display output
temp_data = []
for x in result_data:
    for y in x:
        temp_data.append(y)

# Label columns for output data
label = ['Surface Temperature (Celcius)','Air temperature (Celcius)', 'Relative Humidity (%)', 'Max Wind Speed (knot)', 'Date']

# Using panda library for displaying an output
df = pd.DataFrame(temp_data,columns=label)
# Display an output
df

Unnamed: 0,Surface Temperature (Celcius),Air temperature (Celcius),Relative Humidity (%),Max Wind Speed (knot),Date
0,68,28,58.3,15.9,2017-12-27
1,63,28,58.3,15.9,2017-12-27
2,53,28,58.3,15.9,2017-12-27
3,67,28,58.3,15.9,2017-12-27
4,42,18,52,14,2017-12-15
5,36,18,52,14,2017-12-15
6,38,18,52,14,2017-12-15
7,40,18,52,14,2017-12-15
8,38,28,56.7,16.9,2017-11-29
9,55,28,56.7,16.9,2017-11-29


### Question 2
*** Write an algorithm to find datetime, air temperature (Celcius), surface temperature (Celcius) and confidence when the confidence is between 80 and 100. Justify your choice of the data partition technique and join technique you have used. ***
- Basically, with same justification used in Task 2 Question 1, we used DDP join with hash-based join(little bit of tweak) with rr partitioning method. Only difference is, when we add the result, we used if statement to filter the data which has confidence value between 80 to 100.

In [6]:
"""Task 2 question 2"""
#hash function will be same as above, but HB_join will be different

def HB_join_q2(T1,T2): #T1-climateData, T2-fireData
    """
    Perform the hash-based join algorithm for question 2.
    The join attribute is the numeric attribute in the input tables T1 & T2

    Arguments:
    T1 & T2 -- Tables to be joined

    Return:
    result -- the joined table
    """
        
    result = []
    
    dic = {} # We will use a dictionary
    
    # For each record in table T1
    for t1 in T1:
        # Hash the record based on join attribute value using hash function date_jasj_for_climate into hash table
        t1_key = date_hash_for_climate(t1)
        if t1_key in dic:
            dic[t1_key].add(tuple(t1)) # If there is an entry
        else:
            dic[t1_key] = {tuple(t1)}
    
    # For each record in table T2 (probing)
    for t2 in T2:
        # Hash the record based on join attribute value using date_jasj_for_fire
        t2_key = date_hash_for_fire(t2)
        # If an index entry is found Then
        if t2_key in dic:
            # Compare each record on this index entry with the record of table T2
            for value in dic[t2_key]:
                # If the key is the same then put the result
                if t2[6] == value[1]:
                    # If the key is the between 80 and 100
                    if (int(t2[5])>= 80)and(int(t2[5])<=100):
                        result.append((t2[3],value[2],t2[7],t2[5]))
    return result

def DDP_join(T1, T2, n_processor):
    """
    Perform a divide and broadcast-based parallel join algorithms.
    The join attribute is the numeric attribute in the input tables T1 & T2

    Arguments:
    T1 & T2 -- Tables to be joined
    n_processor -- the number of parallel processors

    Return:
    result -- the joined table
    """
    
    result = []
    
    # Partition T1 into sub-tables using rr_partition().
    # The number of the sub-tables must be the equal to the n_processor
    T1_subset = rr_partition(T1,n_processor)
    
    # Pool: a Python method enabling parallel processing. 
    pool = mp.Pool(processes = n_processor)
    
    for t1 in T1_subset:
        # Apply a join on each processor
        
        # Note that as we assume a shared-memory architecture, no replication
        # of the broadcast table (in this case: table T2 (smaller table) occurs.
        result.append(pool.apply(HB_join_q2,[t1,T2]))
        
    return result

n_processor = 4
result_data = DDP_join(climateData,fireData,n_processor)

# For display output
temp_data = []
for x in result_data:
    for y in x:
        temp_data.append(y)

# Label columns for output data
label = ['Datetime','Air temperature (Celcius)','Surface temperature (Celcius)','Confidence']
# Using panda library for displaying an output
df = pd.DataFrame(temp_data,columns=label)
# Display an output
df

Unnamed: 0,Datetime,Air temperature (Celcius),Surface temperature (Celcius),Confidence
0,2017-12-27T00:02:15,28,63,82
1,2017-12-27T00:02:14,28,67,86
2,2017-12-15T13:17:17,18,42,92
3,2017-11-29T13:17:24,28,55,100
4,2017-11-29T03:52:10,28,87,100
5,2017-11-29T03:52:10,28,80,97
6,2017-11-29T03:52:10,28,84,99
7,2017-11-21T00:27:21,24,59,85
8,2017-11-13T03:52:15,24,61,87
9,2017-11-13T03:52:14,24,53,81


## Task 3: Parallel Sort
- Write an algorithm to sort fire data based on surface temperature (Celcius) in a ascending order. Justify your choice of the data partition technique and sorting technique you have used.
- We decided to use Parallel Merge-all sort for this question. Parallel Merge-all sort algorithm has two steps to complete: Local sort and Final Merge.
- Parallel merge-all sort is simple to implement. Load Balance is easy to acheive by using rr partition. Since this method is straightforward, it is easy to predict the outcomes.

In [7]:
""" Task 3"""

# 1. partition method
# Using the rr_partition for partitioning 
#rr_partition(fireData, 3)

# 2. sorting method
# Using the qsort for sorting 
def qsort_by_surf_temp(arr):
    """
    Quicksort a list
    
    Arguments:
    arr -- the input list to be sorted
    
    Return:
    result -- the sorted arr
    """
    if len(arr) <= 1:
        return arr
    else:
        pivot = arr[0]
        #put into left_arr if its value is smaller than current pivot
        left_arr = [x for x in arr[1:] if int(x[7]) < int(pivot[7])]
        #put into right_arr if its value is greater or equal to current pivot
        right_arr = [x for x in arr[1:] if int(x[7]) >= int(pivot[7])]
        #recursively call qsort for each left_arr and right_arr until length of arr become 1 or less.
        value = qsort_by_surf_temp(left_arr) + [pivot] + qsort_by_surf_temp(right_arr)
        return value

In [8]:
# Let's first look at 'k-way merging algorithm' that will be used 
# to merge sub-record sets in our external sorting algorithm.
import sys

# Find the smallest record
def find_min(records):    
    """ 
    Find the smallest record
    
    Arguments:
    records -- the input record set

    Return:
    result -- the smallest record's index
    """

    #first value in records
    m = int(records[0][7])
    index = 0
    for i in range(len(records)):
        #comparing current value with current smallest value. if current i value is smaller,
        if(int(records[i][7]) < m):
            #change the index of min
            index = i
            #change the min value
            m = int(records[i][7])
    return index

def k_way_merge(record_sets):
    """ 
    K-way merging algorithm
    
    Arguments:
    record_sets -- the set of mulitple sorted sub-record sets

    Return:
    result -- the sorted and merged record set
    """
    
    # indexes will keep the indexes of sorted records in the given buffers
    indexes = []
    for x in record_sets:
        indexes.append(0) # initialisation with 0
    
    # final result will be stored in this variable
    result = []
    
    
    while(True):
        tuple = [] # initialise tuple
        
        #This loop gets the current position of every buffer
        for i in range(len(record_sets)):
            if(indexes[i] >= len(record_sets[i])):
                #generate max_list. From index 0 to 6, any number or string is fine.
                #however, index 7 must be sys.maxsize.
                max_list = ['a','b','c','d','e','f','g',sys.maxsize]
                tuple.append(max_list)
            else:
                tuple.append(record_sets[i][indexes[i]])

        #find the smallest record
        smallest = find_min(tuple)
        
        
        #if we only have sys.maxsize on the tuple, we reached the end of every record set
        if(tuple[smallest][7] == sys.maxsize):
            break
            
        #This record is the next on the merged list
        result.append(record_sets[smallest][indexes[smallest]])

        indexes[smallest] += 1
    
    return result

In [9]:
# serial sorting method

def serial_sorting(dataset, buffer_size):
    """
    Perform a serial external sorting method based on sort-merge
    The buffer size determines the size of eac sub-record set

    Arguments:
    dataset -- the entire record set to be sorted
    buffer_size -- the buffer size determining the size of each sub-record set

    Return:
    result -- the sorted record set
    """
    
    if (buffer_size <= 2):
        print("Error: buffer size should be greater than 2")
        return
    
    result = []
    
    
    #--- Sort Phase ---
    sorted_set = []
    
    # Read buffer_size pages at a time into memory and 
    # sort them, wnd write out a sub-record set (i.e. variable: subsset)
    start_pos = 0
    N = len(dataset)
    while True:
        if((N - start_pos) > buffer_size):
            #read from start position to start_pos+buffer_size
            subset = dataset[start_pos:start_pos + buffer_size]
            #sort the subset (using quickshort)
            sorted_subset = qsort_by_surf_temp(subset)
            sorted_set.append(sorted_subset)
            #change start_pos to start_pos+buffer_sizes (which is end of previous subset)
            start_pos += buffer_size
        else:
            #if remaining number of elements in dataset is smaller than buffer size.
            subset = dataset[start_pos:]
            #sort the subset
            sorted_subset = qsort_by_surf_temp(subset)
            sorted_set.append(sorted_subset)
            break
            
    # --- Merge Phase ---
    merge_buffer_size = buffer_size - 1
    dataset = sorted_set
    while True:
        merged_set = []
        
        N = len(dataset)
        start_pos = 0
        while True:
            if((N - start_pos) > merge_buffer_size):
                #read records from start position to start_pos+buffer_size
                subset = dataset[start_pos:start_pos + merge_buffer_size]
                #merging
                merged_set.append(k_way_merge(subset))
                #change start_pos to start_pos + buffers_size (end of previous dataset)
                start_pos += merge_buffer_size
            else:
                #when remaining data is less than sizes of buffer
                subset = dataset[start_pos:]
                merged_set.append(k_way_merge(subset))
                break
        dataset = merged_set
        if (len(dataset) <= 1): #if the size of merged record set is 1, then stop
            result = merged_set
            break
    
    return result



In [10]:
result = serial_sorting(fireData, 4)
#result

In [11]:
# Parallel sorting method

# Include this package for parallel processing
import multiprocessing as mp

def parallel_merge_all_sorting(dataset, n_processor, buffer_size):
    """
    Perform a parallel merge-all sorting method

    Arguments:
    dataset -- entire record set to be sorted
    n_processor -- number of parallel processors
    buffer_size -- buffer size determining the size of each sub-record set

    Return:
    result -- the merged record set
    """
    if (buffer_size <= 2):
        print("Error: buffer size should be greater than 2")
        return
    
    result = []

    
    # Partitioning using rr
    subsets = rr_partition(dataset, n_processor)
    
    # Pool: a Python method enabling parallel processing. 
    pool = mp.Pool(processes = n_processor)

    # ----- Sort phase -----
    sorted_set = []
    for i in subsets:
        sorted_set.append(*pool.apply(serial_sorting, [i, buffer_size]))
    pool.close()
    
    # ---- Final merge phase ----
    merge = k_way_merge(sorted_set)
    
    
    
    return merge

In [12]:
result = parallel_merge_all_sorting(fireData, 4, 4)

label = ['Latitude','Longitude','Surface Temperature (Kelvin)','DateTime','Power','Confidence','Date','Surface Temperature (Celcius)']
df = pd.DataFrame(result,columns=label)
df

Unnamed: 0,Latitude,Longitude,Surface Temperature (Kelvin),DateTime,Power,Confidence,Date,Surface Temperature (Celcius)
0,-37.886,147.207,302,2017-07-02T04:28:42,10.7,50,2017-07-02,28
1,-37.886,147.207,302,2017-07-02T04:28:42,10.7,50,2017-07-02,28
2,-36.943,143.286,302.7,2017-11-11T15:08:00,18.8,51,2017-11-11,29
3,-37.062,141.373,303.1,2017-07-01T13:11:41,16.1,53,2017-07-01,29
4,-37.466,148.1,302.2,2017-10-02T23:44:31,10.9,50,2017-10-02,29
5,-37.062,141.373,303.1,2017-07-01T13:11:41,16.1,53,2017-07-01,29
6,-37.227,141.146,305.1,2017-10-03T01:22:44,41.2,54,2017-10-03,31
7,-37.38,149.334,304.5,2017-11-30T15:38:32,14.1,61,2017-11-30,31
8,-36.779,146.108,305.3,2017-07-01T03:46:08,25.7,61,2017-07-01,32
9,-35.646,142.282,305.6,2017-12-24T13:12:01,11.8,65,2017-12-24,32


## Task 4: Parallel Group-By

### Question 1
*** Write an algorithm to get the number of fire in each day. You are required to only display total number of fire and the date in the output. Justify your choice of the data partition technique if any. ***
- We used traditional merge-all groupby algorithm for Task 4. It has two phases: local groupby and global merge. As prerequasite, dataset need to be partitioned using rr partition.
- If dataset is large, then efficiency will drop, but since this task's dataset is not huge, it will not effect efficiency much. Therefore we decided merge-all groupby algorithm is more appropirate and easy to implement and use.
- For final merge, this algorithm may create bottleneck. However, since this dataset is not enourmous, performance will not drop much.

In [13]:
from collections import Counter

# The first step in the merge-all groupby method
def local_groupby(dataset):
    """
    Perform a local groupby method

    Arguments:
    dataset -- entire record set to be merged

    Return:
    result -- the aggregated record set according to the group_by attribute index
    """
    
    dict={} # Use a dictionary
    for index, record in enumerate(dataset):
        #find key for each record
        key = date_hash_for_fire(record)
        if key not in dict:
            #create key with value 0
            dict[key] = 0
        #increment the value of the key by 1
        dict[key] += 1
    return dict




def parallel_merge_all_groupby(dataset):
    """
    Perform a parallel merge_all groupby method

    Arguments:
    dataset -- entire record set to be merged

    Return:
    result -- the aggregated record dictionary according to the group_by attribute index
    """
    
    result = {}
    
    # Define the number of parallel processors: the number of sub-datasets.
    n_processor = len(dataset)
    
    # Pool: a Python method enabling parallel processing. 
    pool = mp.Pool(processes = n_processor)
    
    # ----- Local aggregation step -----
    local_result = []
    for s in dataset:
        # call the local aggregation method
        local_result.append(pool.apply(local_groupby,[s]))
    pool.close()
    
    # ---- Global aggregation step ----
    # Let's assume that the global operator is sum.
    merged_result = {}
    for subset in local_result:
        #Merging the dictionaries
        merged_result = dict(Counter(merged_result) + Counter(subset))

    return merged_result


pted_data = rr_partition(fireData,4)
result = parallel_merge_all_groupby(pted_data)

# for displaying an output
for x in result:
    keys = str(x)
    old = str(keys)
    
    # Convert date format
    datetimeobject = datetime.strptime(old,'%Y%m%d')
    new = datetimeobject.strftime('%Y-%m-%d')
    # Display an output
    print("Date: " + new + " , Total Number of Fire: " + str(result[x]))



Date: 2017-04-11 , Total Number of Fire: 24
Date: 2017-09-24 , Total Number of Fire: 28
Date: 2017-04-14 , Total Number of Fire: 18
Date: 2017-09-27 , Total Number of Fire: 7
Date: 2017-03-31 , Total Number of Fire: 22
Date: 2017-09-29 , Total Number of Fire: 2
Date: 2017-05-03 , Total Number of Fire: 64
Date: 2017-10-16 , Total Number of Fire: 1
Date: 2017-08-02 , Total Number of Fire: 2
Date: 2017-08-05 , Total Number of Fire: 1
Date: 2017-05-05 , Total Number of Fire: 31
Date: 2017-08-10 , Total Number of Fire: 1
Date: 2017-10-18 , Total Number of Fire: 6
Date: 2017-08-14 , Total Number of Fire: 5
Date: 2017-03-06 , Total Number of Fire: 2
Date: 2017-03-07 , Total Number of Fire: 1
Date: 2017-03-08 , Total Number of Fire: 2
Date: 2017-03-09 , Total Number of Fire: 3
Date: 2017-03-10 , Total Number of Fire: 8
Date: 2017-03-12 , Total Number of Fire: 5
Date: 2017-10-20 , Total Number of Fire: 3
Date: 2017-03-14 , Total Number of Fire: 10
Date: 2017-03-15 , Total Number of Fire: 7
Date

### Question 2
*** Write an algorithm to find the average surface temperature (Celcius) for each day. You are required to only display average surface temperature (Celcius) and the date in the output. Justify your choice of the data partition technique if any. ***
- Justification is same as above, Task 4 question 1.
- Will reuse the rr_partition. However, local_groupby will be different, since it is not just counting the number, but need to display more information

In [14]:
"""
Task 4 question 2
We will use the rr_partition used above
"""
def local_groupby_q2(dataset):
    """
    Perform a local groupby method

    Arguments:
    dataset -- entire record set to be merged

    Return:
    result -- the aggregated record set according to the group_by attribute index
    """
    
    dict = {} # use a dictionary 

    for index, record in enumerate(dataset):
        key = date_hash_for_fire(record)
        if key not in dict:
            dict[key]=[]
            dict[key].append(0) #sum of surface temperature
            dict[key].append(0) #count number of data which has same key
        dict[key][1]+=1
        dict[key][0]+=int(record[7])
    return dict

def parallel_merge_all_groupby_q2(dataset):
    """
    Perform a parallel merge_all groupby method

    Arguments:
    dataset -- entire record set to be merged

    Return:
    result -- the aggregated record dictionary according to the group_by attribute index
    """
    
    result = {}
    
    # Define the number of parallel processors: the number of sub-datasets.
    n_processor = len(dataset)
    
    # Pool: a Python method enabling parallel processing. 
    pool = mp.Pool(processes = n_processor)
    
    # ----- Local aggregation step -----
    local_result = []
    for s in dataset:
        # call the local aggregation method
        local_result.append(pool.apply(local_groupby_q2,[s]))
    pool.close()
    
    # ---- Global aggregation step ----
    # Let's assume that the global operator is sum.
    merged_result = {}

    for subset in local_result:
        for key in subset:
            if key not in merged_result:
                merged_result[key]=[]
                merged_result[key].append(0)
                merged_result[key].append(0)
            merged_result[key][0]+=subset[key][0] #sum up the sum of surface temperature
            merged_result[key][1]+=subset[key][1] #sum up the number of data count
    
    for key in merged_result:
        #Calucating average surface temperature
        merged_result[key][0] = merged_result[key][0]/merged_result[key][1]
    
    return merged_result
    
pted_data = rr_partition(fireData,4)
result = parallel_merge_all_groupby_q2(pted_data)

# For displaying an output
for x in result:
    keys = str(x)
    old = str(keys)
    
    # Convert date format
    datetimeobject = datetime.strptime(old,'%Y%m%d')
    new = datetimeobject.strftime('%Y-%m-%d')
    # Displaying an output
    print("Date: " + new + " , Average Surface Temperature (Celcius): " + str(result[x][0]))


Date: 2017-10-08 , Average Surface Temperature (Celcius): 41.0
Date: 2017-10-09 , Average Surface Temperature (Celcius): 44.0
Date: 2017-10-10 , Average Surface Temperature (Celcius): 53.333333333333336
Date: 2017-04-11 , Average Surface Temperature (Celcius): 46.291666666666664
Date: 2017-05-01 , Average Surface Temperature (Celcius): 68.4
Date: 2017-05-02 , Average Surface Temperature (Celcius): 55.6
Date: 2017-10-15 , Average Surface Temperature (Celcius): 72.66666666666667
Date: 2017-05-04 , Average Surface Temperature (Celcius): 56.80740740740741
Date: 2017-10-17 , Average Surface Temperature (Celcius): 51.6
Date: 2017-05-06 , Average Surface Temperature (Celcius): 57.529411764705884
Date: 2017-05-07 , Average Surface Temperature (Celcius): 50.333333333333336
Date: 2017-05-08 , Average Surface Temperature (Celcius): 56.291666666666664
Date: 2017-10-21 , Average Surface Temperature (Celcius): 51.25
Date: 2017-05-10 , Average Surface Temperature (Celcius): 52.86842105263158
Date: 20

## Task 5: Parallel Group-By Join
*** Write an algorithm to find the average surface temperature (Celcius) for each weather station. You are required to only display average surface temperature (Celcius) and the station in the output. Justify your choice of the data partition and join technique. ***
- It has two phases: DDP join and merge-all groupby.
- We decided to use the Group-By After Join rather than Group-By Before Join because join attribute(date) is not a part of group-by attribute(station). Reason of using DDP and merge-all groupby algorithm is same as previous questions.
- For DDP join, Hash based on date will be used, and for group-by, hash based on station will be used. 

In [15]:
#Task 5 HD Task - join,group-by
#hash for climate and fire used in question 2 will be used again to do join by date
def HB_join_q5(T1,T2): #T1-climateData, T2-fireData
    """
    Perform the hash-based join algorithm.
    The join attribute is the numeric attribute in the input tables T1 & T2

    Arguments:
    T1 & T2 -- Tables to be joined

    Return:
    result -- the joined table
    """
    
    result = []
    
    dic = {} # We will use a dictionary
    
    # For each record in table T1
    for t1 in T1:
        # Hash the record based on join attribute value using hash function date_hash_for_climate into hash table
        t1_key = date_hash_for_climate(t1)
        if t1_key in dic:
            dic[t1_key].add(tuple(t1))
        else:
            dic[t1_key] = {tuple(t1)}
    
    # For each record in table T2 (probing)
    for t2 in T2:
        # Hash the record based on join attribute value using date_hash_for_fire
        t2_key = date_hash_for_fire(t2)
        # If an index entry is found Then
        if t2_key in dic:
            # Compare each record on this index entry with the record of table T2
            for value in dic[t2_key]:
                # If the key is the same then put the result
                if t2[6] == value[1]:
                    result.append([value[0], value[1], t2[7]])
    
    return result

def DDP_join(T1, T2, n_processor):
    """
    Perform a divide and broadcast-based parallel join algorithms.
    The join attribute is the numeric attribute in the input tables T1 & T2

    Arguments:
    T1 & T2 -- Tables to be joined
    n_processor -- the number of parallel processors

    Return:
    result -- the joined table
    """
    
    result = []
    
    # Partition T1 into sub-tables using rr_partition().
    # The number of the sub-tables must be the equal to the n_processor
    T1_subset = rr_partition(T1,n_processor)
    
    # Pool: a Python method enabling parallel processing. 
    pool = mp.Pool(processes = n_processor)
    
    for t1 in T1_subset:
        # Apply a join on each processor
        
        # Note that as we assume a shared-memory architecture, no replication
        # of the broadcast table (in this case: table T2 (smaller table) occurs.
        result.append(pool.apply(HB_join_q5,[t1,T2]))
        
    return result

def date_hash_for_q5(x): #will hash the station in climateData
    """
    We define a hash function 'date_hash_for_q5' that is used in the hashing process works 
    by converting String Station value to integer value of the hashed attribute, which
    in this case is the join attribute. 
    
    Arguments:
    x -- a record where hashing will be applied on its join attribute

    Return:
    result -- the hash index of the record x
    """
    
    return int(x[0])

# The first step in the merge-all groupby method
def local_groupby_q5(dataset):
    """
    Perform a local groupby method

    Arguments:
    dataset -- entire record set to be merged

    Return:
    result -- the aggregated record set according to the group_by attribute index
    """
    
    dict = {} # We use a dictionary

    for index, record in enumerate(dataset):
        key = date_hash_for_q5(record)
        if key not in dict:
            dict[key]=[]
            dict[key].append(0) #sum of surf_temp,index 0
            dict[key].append(0) #count, index 1
        dict[key][1]+=1
        dict[key][0]+=int(record[2])
    return dict


def parallel_merge_all_groupby_q5(dataset):
    """
    Perform a parallel merge_all groupby method

    Arguments:
    dataset -- entire record set to be merged

    Return:
    result -- the aggregated record dictionary according to the group_by attribute index
    """
    
    result = {}
    
    # Define the number of parallel processors: the number of sub-datasets.
    n_processor = len(dataset)
    
    # Pool: a Python method enabling parallel processing. 
    pool = mp.Pool(processes = n_processor)
    
    # ----- Local aggregation step -----
    local_result = []
    for s in dataset:
        # call the local aggregation method
        local_result.append(pool.apply(local_groupby_q5,[s]))
    pool.close()
    
    # ---- Global aggregation step ----
    # Let's assume that the global operator is sum.
    merged_result = {}
    
    for subset in local_result:
        for key in subset:
            if key not in merged_result:
                merged_result[key]=[]
                merged_result[key].append(0) #sum of surf_temp,index 0
                merged_result[key].append(0) #count, index 1
            merged_result[key][0]+=subset[key][0]
            merged_result[key][1]+=subset[key][1]
    
    for key in merged_result:
        #calculate average surface temperature for each station.
        merged_result[key][0] = merged_result[key][0]/merged_result[key][1]
    
    return merged_result

temp_data = DDP_join(climateData,fireData,4)
merged_data = parallel_merge_all_groupby_q5(temp_data)

# Display an output
for x in merged_data:
    print("Sation Number: " + str(x) + " , Average Surface Temperature (Celcius): " + str(merged_data[x][0]))


Sation Number: 948701 , Average Surface Temperature (Celcius): 56.06938603868797
Sation Number: 948702 , Average Surface Temperature (Celcius): 52.148275862068964
