In [510]:
from dask import delayed

import dask.dataframe as dd
import dask.array as da
import pandas as pd
import numpy as np
import dask
import time
import itertools

In [511]:
df = pd.read_csv('zoo.csv')

x_bar = df[df.catsize == 1]
x_til = df[df.catsize != 1]

x_bar.to_csv('x_bar.csv', index_label='ORIGINAL_INDEX')
x_til.to_csv('x_til.csv', index_label='ORIGINAL_INDEX')

In [512]:
x_bar = dd.read_csv('x_bar.csv').set_index('ORIGINAL_INDEX')
x_til = dd.read_csv('x_til.csv').set_index('ORIGINAL_INDEX')

In [580]:
def get_sample(x, seed, sample_size):

    frac = sample_size / len(x)
    sample = x.sample(frac, random_state=seed)

    return sample


def dissim(A, b):
    
    return np.sum(A != b, axis=1) / len(b)

# @delayed
def build_matrices(x_bar_sample, x_til_sample, beta, sample_size):

    x_bar_arr = x_bar_sample.values.compute()
    x_til_arr = x_til_sample.values.compute()

    dissim_matrix = np.empty((sample_size, sample_size))

    for i, bar_row in enumerate(x_bar_arr):
        dissim_matrix[i, :] = dissim(x_til_arr, bar_row)
    
    adjacency_matrix = np.where(dissim_matrix <= beta, 1, 0)
    
    return adjacency_matrix, dissim_matrix


def build_dataframe(x_bar_sample, x_til_sample, sample_idx, beta, sample_size):

    adjacency_matrix, dissim_matrix = build_matrices(x_bar_sample, x_til_sample, beta, sample_size)

    row_idxs, col_idxs = np.where(adjacency_matrix == 1)
    idx_pairs = zip(row_idxs, col_idxs)
    
    dfs = []
    for idxs in idx_pairs:

        x_bar_idx = x_bar_sample.index.compute()[idxs[0]]
        x_til_idx = x_til_sample.index.compute()[idxs[1]]

        adjacency_df = pd.DataFrame({
            'beta': beta, 'sample_idx': sample_idx,
            'x_bar_idx': x_bar_idx, 'x_til_idx': x_til_idx,
            'dissim': dissim_matrix[idxs[0], idxs[1]],
            'sample_size': sample_size
        }, index=[''])

        dfs.append(adjacency_df)
    
    if len(dfs) > 0:
        df = dd.concat(dfs, axis=0, interleave_partitions=True)
        
        return df

In [603]:
def build_task(x_bar, x_til, sample_idx=0, seed=0, beta=0.5, sample_size=10):
    
    start = time.clock()

    # Take a sample from each set
    x_bar_sample = get_sample(x_bar, seed, sample_size)
    x_til_sample = get_sample(x_til, seed, sample_size)

    # Only consider the relevant columns from now on
    x_bar_sample.drop(['animal_name', 'class_type', 'catsize'], axis=1)
    x_til_sample.drop(['animal_name', 'class_type', 'catsize'], axis=1)

    # Construct the dataframe with similar pairs
    dataframe = build_dataframe(x_bar_sample, x_til_sample,
                                sample_idx, beta, sample_size)

    time_taken = time.clock() - start

    if np.any(dataframe):
        dataframe['time_taken'] = time_taken

        dataframe.to_csv(f'results/beta{beta}/size{sample_size}/seed{seed}/idx{sample_idx}/', index=False)

In [607]:
def compute_task(sample_idx=0, beta=0.5, sample_size=10, seed=0):

    result = dd.read_csv(f'results/beta{beta}/size{sample_size}/**/**/*').compute(get=dask.get)

    return result

In [605]:
for seed, idx in itertools.product(range(3), repeat=2):
    build_task(x_bar, x_til, seed=seed)

In [608]:
result = compute_task().groupby('x_bar_idx').x_til_idx.count().nlargest(5)
result

x_bar_idx
71    14
95    10
66     9
5      7
56     6
Name: x_til_idx, dtype: int64

In [484]:
sample = get_sample(x_bar, seed=0, sample_size=10)
sample2 = get_sample(x_til, seed=0, sample_size=10)

In [485]:
sample.index.compute()

Int64Index([69, 70, 71, 87, 5, 22, 18, 74, 50, 29], dtype='int64', name='ORIGINAL_INDEX')

In [486]:
arr = da.array(sample.values)

In [487]:
array = arr.compute()

In [488]:
exdf = pd.DataFrame(columns=['beta', 'dissim', 'name'], index=['name'])

dask_df = dd.from_pandas(exdf, npartitions=1)

In [489]:
dfs = []

for beta, name in enumerate(['Henry', 'Ed', 'Jack', 'Mol', 'Con', 'Jess']):
    df = pd.DataFrame({'beta': beta, 'dissim': 0.5, 'name': name}, index=[''])
    dfs.append(df)

dask_df = dd.concat(dfs, axis=0, interleave_partitions=True)

In [490]:
dask_df.compute()

Unnamed: 0,beta,dissim,name
,0,0.5,Henry
,1,0.5,Ed
,2,0.5,Jack
,3,0.5,Mol
,4,0.5,Con
,5,0.5,Jess


In [491]:
inputs = itertools.product(range(10),
                           np.round(np.linspace(0, 0.5, 6)[::-1], 2),
                           range(10, 30))

In [492]:
results = [build_task(x_bar, x_til, sample_idx, beta, sample_size) \
           for sample_idx, beta, sample_size in inputs]

In [493]:
bsam = get_sample(x_bar, 0, 10)
tsam = get_sample(x_til, 0, 10)

In [494]:
adjacency_matrix, dissim_matrix = build_matrices(bsam, tsam, 0.5, 10)

In [495]:
row_idxs, col_idxs = np.where(adjacency_matrix == 1)
idx_pairs = list(zip(row_idxs, col_idxs))
print(idx_pairs)

[(0, 2), (0, 3), (0, 8), (0, 9), (1, 2), (1, 3), (1, 9), (2, 0), (2, 1), (2, 2), (2, 4), (2, 5), (2, 8), (3, 0), (3, 1), (3, 2), (3, 5), (3, 6), (3, 7), (4, 2), (4, 3), (4, 8), (4, 9), (5, 2), (5, 3), (5, 8), (5, 9), (6, 2), (6, 4), (6, 7), (6, 8), (7, 3), (7, 7), (7, 9), (8, 2), (8, 3), (8, 8), (8, 9), (9, 2), (9, 3), (9, 9)]
