# Distributed Databases and Big Data

# Solution Workbook



**About Jupyter Notebook**

*Server Information:*

    The version of the notebook server is 4.2.3 and is running on:
        Python 3.5.2 


*Current Kernel Information:*
    
    Python 3.5.2 
    IPython 5.1.0 

*Kindly run .ipynb file from the folder extracted from zip inplace, or by opening the file in jupyter by directing it to the mentioned folder.*

# Table of Contents
  
- [Parallel Group By Join](#5)
    

## Libraries

***`If the library isn't installed already, please unhash and run the code below.`***


In [1]:
#!pip install pandas
#!pip install multiprocessing

In [3]:
import pandas as pd
from multiprocessing import Pool

## data collection

In [4]:
# CLIMATE DATA

#read excel into dataframe
cdf = pd.read_csv('ClimateData.csv')

#create list
cd = []
row = []
for i in range(cdf.shape[0]):
    for j in range(cdf.shape[1]):
        row.append(cdf.iloc[i,j])
    cd.append(row)
    row = []

In [5]:
# FIRE DATA 

#read excel into dataframe
fdf = pd.read_csv('FireData.csv')

#creating list
fd = []
row = []
for i in range(fdf.shape[0]):
    for j in range(fdf.shape[1]):
        row.append(fdf.iloc[i,j])
    fd.append(row)
    row = []


***

# Parallel Group By Join

<a id='5'></a>

# ** Parallel Group-By Join **
1. Write an algorithm to find the ​ average surface temperature ​ ​ (°C) ​ for each weather station.
You are required to only display ​ average surface temperature (°C) ​ ​ and ​ the station ​ in the
output.​ ​ Justify your choice of the data partition and join technique.
Hint: You need to join using the date and group by based on station.

**Solution:** Group by Partitioning Algorithem, Range Partition will be done in it.

Since the Join attribute is different than the group by attribute hence one of the Parallel Group by **`after`** Join algorithems will need to be used. In this case the Group by Partitioning Algorithem is used rather than the Join Partitioning as the former is a one phase partitioning scheme while the latter is a two phased one. 

The group by attribute here are the stations. Keeping in mind that there are only three unique stations the range partitioning will be done on 3 processors with each processor getting one station. This might cause a bit of skew but would still be better than the join partition algorithem because of two reasons:

* The aggregate function is 'average' and hence during the redistribution phase on the group by attribute in join partition algorithem extra information such as the count of values for each aggreate station will also need to be provided. This adds complexity to the algorithem. 

* Second join partition is a two phase method which means it adds an extra layer of processing for the processors and also includes transfering data between the processors a total of two times; once based on the join attribute and the second based on the group by attribute.

In terms of joining the tables hash join will be used as it is the most efficient compared to nested loop and sort merge. Since the grouping/aggregating is done locally in each processor hence there is no parallel algorithm that needs to be used. 

Also, when applying the group by partition algorithem we will use the climate data set to be hashed and fire data to be probed. This is because the climate data is much smaller than the fire data hence this will result in a smaller hash table. 

### Range partitioning Function to be used to range partition the Table with group by attribute

In [115]:
# Range data partitionining function
def range_partition(data, range_indices):
    
    """
    Perform range data partitioning on data
    *Based on partitioning the climate data on station
    Arguments:
    data -- an input dataset which is a list
    range_indices -- the index list of ranges to be split 
    Return:
    result -- the paritioned subsets of D
    """
    
    result = []
    
    
    # First, we sort the dataset according their values
    new_data = sorted(data, key= lambda x: x[0])
    
    
    # Calculate the number of bins - 1
    n_bin = len(range_indices)
    
    
    # For each bin, perform the following
    for i in range(n_bin):
        
        # Find elements to be belonging to each range
        s = [x for x in new_data if x[0] < range_indices[i]]
        
        # Add the partitioned list to the result
        result.append(s)
        
        # Find the last element in the previous partition
        last_element = s[len(s)-1]
        
        # Find the index of of the last element
        last = new_data.index(last_element)
        
        # Remove the partitioned list from the dataset
        new_data = new_data[int(last)+1:]
    
    # Append the last remaining data list
    result.append([x for x in new_data if x[0] >= range_indices[n_bin-1]])
    
    
    return result

In [116]:
#testing range partition

a = [948702, '2016-12-31', 19, 56.799999999999997, 7.9000000000000004, 11.1, '   72.0*', '  61.9*', ' 0.00I']
b = [948700, '2016-12-31', 19, 56.799999999999997, 7.9000000000000004, 11.1, '   72.0*', '  61.9*', ' 0.00I']
c = [948701, '2016-12-31', 19, 56.799999999999997, 7.9000000000000004, 11.1, '   72.0*', '  61.9*', ' 0.00I']

l = [a, b, c]

out = range_partition(l, [948701, 948702])
print(out[0])
print(out[1])
print(out[2])

[[948700, '2016-12-31', 19, 56.8, 7.9, 11.1, '   72.0*', '  61.9*', ' 0.00I']]
[[948701, '2016-12-31', 19, 56.8, 7.9, 11.1, '   72.0*', '  61.9*', ' 0.00I']]
[[948702, '2016-12-31', 19, 56.8, 7.9, 11.1, '   72.0*', '  61.9*', ' 0.00I']]


### Hash Join function for local join in each processor

##### hash key function

In [117]:
# Define a simple hash function.
def s_hash(x, n):
    """
    Define a simple hash function for demonstration
    Arguments:
    x -- an input date as string
    n -- the number of processors
    Return:
    result -- the hash value of x
    """
    ### START CODE HERE ###
    date_parse = x.split('-')
    result = int(date_parse[2])%n #hash key by days
    
    ### END CODE HERE ###
    return result

In [118]:
#test hash
s_hash('2016-12-31',3)

1

##### Hash based join function

In [119]:
def HB_join(T1, T2, n_processor):
    
    """
    Perform the hash-based join algorithm.
    The join attribute is the numeric attribute in the input tables T1 & T
    2
    Arguments:
    T1 & T2 -- Tables to be joined
    n_processor -- number of processors to be used
    Return:
    result -- the joined table
    """
    
    result = []
    
    ### START CODE HERE ###
    
    dic = {} # We will use a dictionary
    
    # For each record in table T2
    for rec in T2:
        
        # Hash the record based on join attribute value using hash function H into hash table
        s_key = s_hash(rec[1], n_processor)
        
        dic.setdefault(s_key, []) #add key is doesnt exist if it does let it be
        dic[s_key].append(rec[0:2]) #add record
    
    # For each record in table T1 (probing)
    for rec in T1:
        # Hash the record based on join attribute value using H
        r_key = s_hash(rec[6], n_processor)
        
        if r_key in dic: # If an index entry is found Then            
            for value in dic[r_key]:   # Compare each record on this index entry with the record of table T1         
                if value[1] == rec[6]:
                    result.append([value[0]] + [rec[-1]])      
    
    
    ### END CODE HERE ###
    return result

In [120]:
#test hb_join
#HB_join(fd, cd, 3) #uncomment to see result

### Parallel Group By Partitioning Join

##### local group by function to be used to calculate average for each station

In [121]:
#local group by function for avg
def local_group_by_avg(data):
    
    #create list to store grouped data
    grouped_data = []
    
    #if data is empty return empty list
    if len(data) == 0:
        return grouped_data  
    
    #create dictionary to store values for each group by attribute
    dic = {}
    
    #for each record put in its value under the group by attribute in the dictionary
    for rec in data:
        dic.setdefault(rec[0], [])
        dic[rec[0]].append(rec[1])
        
    #for each key in dictionary calculate the sum and count of the values and then obtain the average
    for key in dic.keys():
        station_sum = sum(dic[key])
        station_count = len(dic[key])
        station_avg = station_sum/station_count
        grouped_data.append([key, station_avg]) #append the grouped by data in the output list
            
    return grouped_data

##### Parallel group by partition function

In [122]:
# Parallel searching algorithm for range selection
# range partition and binary search

def parallel_Group_by_Partition_Join_avg(T1, T2, n_processor, partition_range):
    
    """
    Perform parallel group by partition join and aggregates by avg

    Arguments:
    T1 & T2 -- the input datasets which need to be joined
    T1: fire data
    T2: climate data
    n_processor -- the number of parallel processors
    partition_range -- partition to be used for range partitioning phase (e.g. [30, 50])
    
    Return:
    results -- joined and aggregated result 
    """
    
    processes = [] #list to store all active processes
    output = [] #list to store the output of all processes

    pool = Pool(processes = n_processor+1) #one kept aside for the main python process and 3 for workers
       
    #Range partition climate data on station to 3 processors
    T2_r_part_data = range_partition(T2, [948701, 948702])
    
    #in each processor perform local join of the ranged subset and the entire broadcasted T1
    for T2_r_part in T2_r_part_data:
        process = pool.apply_async(HB_join, args=(T1, T2_r_part, n_processor,))
        processes.append(process)
    
    #obtain the joined data partitions in each process
    joined_data_parts = [p.get() for p in processes]
    processes = [] #empty active processes list
    
    #perform aggregate group by in each processor
    for joined_data_part in joined_data_parts:
        process = pool.apply_async(local_group_by_avg, args=(joined_data_part,))
        processes.append(process)

    #get output from each processor
    joined_grouped_parts = [p.get() for p in processes]
    processes = [] #empty active processors list
    
    #close the pool of processors
    pool.close()
    
    #to obtain the final output just merge the data in all processors
    for processor_data in joined_grouped_parts:
        output = output + processor_data    
    
    return output

### Solution

In [130]:
#using 3 processor and ranged so that each processor gets climate data for each station.
parallel_Group_by_Partition_Join_avg(fd, cd, 3, [948701,948702])

[[948701, 56.069386038687973], [948702, 52.148275862068964]]


--------------------------------------------------------------------------------------------------------------------------