In [14]:
import pandas as pd
import numpy as np
import multiprocessing as mp
import datetime
from multiprocessing import Pool

In [158]:
# Round-robin data partitionining function
def rr_partition(data, n, need_index = None):
    """
    Perform data partitioning on data

    Arguments:
    data -- an input dataset which is a list
    n -- the number of processors

    Return:
    result -- the paritioned subsets of D
    """
    result = []
    for i in range(n):
        result.append([])
    
    ### START CODE HERE ### 
    
    # Calculate the number of the elements to be allocated to each bin
    n_bin = len(data)/n
    
    # For each bin, perform the following
    for index, element in enumerate(data): 
        # Calculate the index of the bin that the current data point will be assigned
        index_bin = (int) (index % n)
        if need_index:
            result[index_bin].append((index,element)) #need index
        else:
            result[index_bin].append(element) 
            
    ### END CODE HERE ###
    
    return result

In [157]:
climate_data = pd.read_csv('ClimateData.csv')
climate_data.columns = ['Station','Date','Air Temperature(Celcius)','Relative Humidity','WindSpeed (knots)','Max Wind Speed','MAX','MIN','Precipitation']

In [150]:
parallel_merge_all_groupby(data_partition)

{948700: ['2016-12-31',
  '2017-01-04',
  '2017-01-07',
  '2017-01-10',
  '2017-01-13',
  '2017-01-16',
  '2017-01-19',
  '2017-01-22',
  '2017-01-25',
  '2017-01-28',
  '2017-01-31',
  '2017-02-03',
  '2017-02-06',
  '2017-02-09',
  '2017-02-12',
  '2017-02-15',
  '2017-02-18',
  '2017-01-02',
  '2017-01-05',
  '2017-01-08',
  '2017-01-11',
  '2017-01-14',
  '2017-01-17',
  '2017-01-20',
  '2017-01-23',
  '2017-01-26',
  '2017-01-29',
  '2017-02-01',
  '2017-02-04',
  '2017-02-07',
  '2017-02-10',
  '2017-02-13',
  '2017-02-16',
  '2017-02-19',
  '2017-01-03',
  '2017-01-06',
  '2017-01-09',
  '2017-01-12',
  '2017-01-15',
  '2017-01-18',
  '2017-01-21',
  '2017-01-24',
  '2017-01-27',
  '2017-01-30',
  '2017-02-02',
  '2017-02-05',
  '2017-02-08',
  '2017-02-11',
  '2017-02-14',
  '2017-02-17'],
 948701: ['2017-02-21',
  '2017-02-24',
  '2017-02-27',
  '2017-03-02',
  '2017-03-05',
  '2017-03-08',
  '2017-03-11',
  '2017-03-14',
  '2017-03-17',
  '2017-03-20',
  '2017-03-23',
  '2017

# -------------------------

In [25]:

# The first step in the merge-all groupby method
def local_groupby2(dataset):
    """
    Perform a local groupby method

    Arguments:
    dataset -- entire record set to be merged

    Return:
    result -- the aggregated record set according to the group_by attribute index
    """

    dict = {}
    for record in dataset:
        key = record[0]
        val = record[1]
        if key not in dict:
            dict[key] = [0,0]
        dict[key][0] += 1
        dict[key][1] += val
    return dict


# The first step in the merge-all groupby method
def local_groupby3(dataset):
    """
    Perform a local groupby method

    Arguments:
    dataset -- entire record set to be merged

    Return:
    result -- the aggregated record set according to the group_by attribute index
    """

    dict = {}
    for record in dataset:
        if isinstance(record[1], tuple) or isinstance(record[1], list):
            key = record[0]
            val = record[1]
            if key not in dict:
                dict[key] = [0,0]
            dict[key][0] += val[0]
            dict[key][1] += val[1]
        else:
            key = record[0]
            val = record[1]
            if key not in dict:
                dict[key] = [0,0]
            dict[key][0] += 1
            dict[key][1] += val
    for x in dict.items():
        
        dict[x[0]] = x[1][1]/x[1][0]
    return dict


def parallel_merge_all_groupby2(dataset):
    """
    Perform a parallel merge_all groupby method

    Arguments:
    dataset -- entire record set to be merged

    Return:
    result -- the aggregated record dictionary according to the group_by attribute index
    """
    
    result = {}

    ### START CODE HERE ### 
    
    # Define the number of parallel processors: the number of sub-datasets.
    n_processor = len(dataset)
    # Pool: a Python method enabling parallel processing. 
    pool = mp.Pool(processes = n_processor)
    # ----- Local aggregation step -----
    local_result = []
    for s in dataset:
        # call the local aggregation method
        local_result.append(pool.apply(local_groupby2, [s]))
    pool.close()
    
    new_set=[]
    for element in local_result:
        new_set.extend(element.items())

    # ---- Global aggregation step ----
    # Let's assume that the global operator is sum.
    # Implement here
    result=local_groupby3(new_set)
    ### END CODE HERE ###
    
    return result

In [28]:
fire_data = pd.read_csv('FireData.csv')
fire_selected_col = fire_data[['Date','Surface Temperature (Celcius)']]
subset =  rr_partition(fire_selected_col.values,4)
ST_data = parallel_merge_all_groupby2(subset)

In [123]:
station = climate_data[['Date','Station']]

In [124]:
average = pd.DataFrame(list(ST_data.items()),columns = ['Date', 'Average Surface Temperature (Celcius)'])
average['Date'] = pd.to_datetime(average.Date)
station['Date'] = pd.to_datetime(station.Date)


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  This is separate from the ipykernel package so we can avoid doing imports until


#  Join

In [125]:
date_divisions = ['2017/4/1','2017/7/1','2017/10/1']

In [136]:
def range_partition(data, range_indices):
    result = []
    n_bin = len(range_indices) 
    # For each bin, perform the following
    for i in range(n_bin): 
        s = data.loc[data['Date'] < range_indices[i]]
        result.append(s) 
        data = data.loc[data['Date'] >= range_indices[i]]
    result.append(data.loc[data['Date'] >= range_indices[n_bin-1]]) 
    return result
# Parallel searching algorithm for range selection
def s_hash(x, n):
    ### START CODE HERE ###
    result = x%n
    ### END CODE HERE ###
    return result

def parallel_search_range(data, query_range, n_processor):
    rr = []
    pool = Pool(processes=n_processor)
    ### START CODE HERE ###
    dic = {} # We will use a dictionary
    for i, x in enumerate(data['Confidence']): # For each data record, perform the following
        h = s_hash(x, n_processor) # Get the hash key of the input
        if (h in dic.keys()): # If the key exists
            l = dic[h]
            l.append((i,x))
            dic[h] = l # Add the new input to the value set of the key
        else: # If the key does not exist
            l = [] # Create an empty value set
            l.append((i,x))
            dic[h] = l # Add the value set to the key

    for i in range(query_range[0],query_range[1]):
        s=s_hash(i,n_processor)
        if s in dic.keys():
            for j in dic[s]:
                if j[-1]==i:
                    rr.append(j[0])
                    
    return data.loc[rr]

def H(r):
    """
    We define a hash function 'H' that is used in the hashing process works 
    by summing the first and second digits of the hashed attribute, which
    in this case is the join attribute. 
    
    Arguments:
    r -- a record where hashing will be applied on its join attribute

    Return:
    result -- the hash index of the record r
    """
    # Convert the value of the join attribute into the digits
    digits = [int(d) for d in str(r[0]) if d.isdigit()]
    
    # Calulate the sum of elemenets in the digits
    return sum(digits)
def HB_join(T1, T2):
    
    """
    Perform the hash-based join algorithm.
    The join attribute is the numeric attribute in the input tables T1 & T2

    Arguments:
    T1 & T2 -- Tables to be joined

    Return:
    result -- the joined table
    """
    
    result = []
    
    dic = {} # We will use a dictionary
    
    # For each record in table T2
    for s in T2.index:
        # Hash the record based on join attribute value using hash function H into hash table
        s_key = H(T2.loc[s])
        if s_key in dic:
            dic[s_key].append(T2.loc[s]) # If there is an entry
        else:
            dic[s_key] = [T2.loc[s]] 
        
    # For each record in table T1 (probing)
    for r in T1.index:
        # Hash the record based on join attribute value using H
        r_key = H(T1.loc[r])

        # If an index entry is found Then
        if r_key in dic:
            
            # Compare each record on this index entry with the record of table T1
            for item in dic[r_key]:

                if item[0] == T1.loc[r][0]:
                    

                    # Put the rsult
                    result.append([T1.loc[r][1],item[0],item[1]])
    
    return result

# Include this package for parallel processing
import multiprocessing as mp

def DPBP_join(T1, T2, n_processor):
    """
    Perform a disjoint partitioning-based parallel join algorithm.
    The join attribute is the numeric attribute in the input tables T1 & T2

    Arguments:
    T1 & T2 -- Tables to be joined
    n_processor -- the number of parallel processors

    Return:
    result -- the joined table
    """
    
    result = []
    
    ### START CODE HERE ### 
    
    # Partition T1 & T2 into sub-tables using rr_partition().
    # The number of the sub-tables must be the equal to the n_processor
    T1_subsets = range_partition(T1, date_divisions)
    T2_subsets = range_partition(T2, date_divisions)
    
    # Pool: a Python method enabling parallel processing. 
    pool = mp.Pool(processes = n_processor)
    for i in range(len(T1_subsets)):
        # Apply a join on each processor
        result.extend(pool.apply(HB_join, [T1_subsets[i], T2_subsets[i]]))

    ### END CODE HERE ###
    
    return result

In [137]:
#Test range_partition

date_divisions = pd.to_datetime(date_divisions)

S = range_partition(station,date_divisions)
R = range_partition(average,date_divisions)

In [139]:
n_processor = 4
result = DPBP_join(average,station , n_processor)

In [141]:
output = pd.DataFrame(result,columns = ['Average Surface Temperature (Celcius)','Date','Station'])

In [148]:
station_ST = output[['Station','Average Surface Temperature (Celcius)']]

# GroupBy

In [161]:
# The first step in the merge-all groupby method
def local_groupby2(dataset):
    """
    Perform a local groupby method

    Arguments:
    dataset -- entire record set to be merged

    Return:
    result -- the aggregated record set according to the group_by attribute index
    """

    dict = {}
    for record in dataset:
        key = record[0]
        val = record[1]
        if key not in dict:
            dict[key] = [0,0]
        dict[key][0] += 1
        dict[key][1] += val
    return dict

# The first step in the merge-all groupby method
def local_groupby3(dataset):
    """
    Perform a local groupby method

    Arguments:
    dataset -- entire record set to be merged

    Return:
    result -- the aggregated record set according to the group_by attribute index
    """

    dict = {}
    for record in dataset:
        if isinstance(record[1], tuple) or isinstance(record[1], list):
            key = record[0]
            val = record[1]
            if key not in dict:
                dict[key] = [0,0]
            dict[key][0] += val[0]
            dict[key][1] += val[1]
        else:
            key = record[0]
            val = record[1]
            if key not in dict:
                dict[key] = [0,0]
            dict[key][0] += 1
            dict[key][1] += val
    for x in dict.items():
        
        dict[x[0]] = x[1][1]/x[1][0]
    return dict

def parallel_merge_all_groupby2(dataset):
    """
    Perform a parallel merge_all groupby method

    Arguments:
    dataset -- entire record set to be merged

    Return:
    result -- the aggregated record dictionary according to the group_by attribute index
    """
    
    result = {}

    ### START CODE HERE ### 
    
    # Define the number of parallel processors: the number of sub-datasets.
    n_processor = len(dataset)
    # Pool: a Python method enabling parallel processing. 
    pool = mp.Pool(processes = n_processor)
    # ----- Local aggregation step -----
    local_result = []
    for s in dataset:
        # call the local aggregation method
        local_result.append(pool.apply(local_groupby2, [s]))
    pool.close()
    
    new_set=[]
    for element in local_result:
        new_set.extend(element.items())

    # ---- Global aggregation step ----
    # Let's assume that the global operator is sum.
    # Implement here
    result=local_groupby3(new_set)
    ### END CODE HERE ###
    
    return result

In [162]:
subset =  rr_partition(station_ST.values,4)
parallel_merge_all_groupby2(subset)

{948701.0: 53.089792535260976, 948702.0: 50.29047885609302}