In [3]:
import dask
import dask.dataframe as df
import numpy as np
import pandas as pd
from scipy import sparse as sp
from math import log
from collections import Counter
import functools 
np.random.seed(0)

In [4]:
true_classes, true_idx =np.unique([1,2,3,3,2,1], return_inverse=True)
print(true_classes)
print(true_idx)
pred_classes, pred_idx =np.unique([4,5,4,4,5,5], return_inverse=True)
print(pred_classes)
print(pred_idx)
n_classes = true_classes.shape[0]
n_preds = pred_classes.shape[0]

[1 2 3]
[0 1 2 2 1 0]
[4 5]
[0 1 0 0 1 1]


In [5]:
@dask.delayed
def partition_mutual_info_score(true: pd.Series, pred: pd.Series):
    datos = {}
    true_classes, true_idx = np.unique(true, return_inverse=True)
    datos['true_classes'] = true_classes
    datos['true_idx'] = true_idx
    pred_classes, pred_idx = np.unique(pred, return_inverse=True)
    datos['pred_classes'] = pred_classes
    datos['pred_idx'] = pred_idx
    n_classes = true_classes.shape[0]
    n_preds = pred_classes.shape[0]
    datos['n_classes'] = n_classes
    datos['n_preds'] = n_preds
    contingency = sp.coo_matrix((np.ones(true_idx.shape[0]),
                                 (true_idx, pred_idx)),
                                shape=(n_classes, n_preds),
                                dtype=np.int)
    nzx, nzy, nz_val = sp.find(contingency)
    datos['nzx'], datos['nzy'], datos['nz_val'] = nzx, nzy, nz_val
    contingency_sum = contingency.sum()
    datos['contingency_sum'] = contingency_sum
    pi = np.ravel(contingency.sum(axis=1))
    datos['pi'] = pi
    pj = np.ravel(contingency.sum(axis=0))
    datos['pj'] = pj
    return datos

In [6]:
@dask.delayed
def gen_pi_dask(chunks_mi_delayed_list: list, true_classes_len: int):
    #pi_dask = [0 for i in range(true_classes_len)]
    pi_dask = np.zeros(true_classes_len)
    for index, clase in enumerate(true_classes):
        for mi_delayed in chunks_mi_delayed_list:
            try:
                index_clase = mi_delayed['true_classes'].tolist().index(clase)
            except (IndexError, ValueError):
                index_clase = None
            if index_clase is not None:
                pi_dask[index] = pi_dask[index] + mi_delayed['pi'][mi_delayed['true_classes'].tolist().index(clase)]
    return pi_dask

@dask.delayed
def gen_pj_dask(chunks_mi_delayed_list: list, pred_classes_len: int):
    #pj_dask = [0 for i in range(pred_classes_len)]
    pj_dask = np.zeros(pred_classes_len)
    for index, clase in enumerate(pred_classes):
        for mi_delayed in chunks_mi_delayed_list:
            try:
                index_clase = mi_delayed['pred_classes'].tolist().index(clase)
            except (IndexError, ValueError):
                index_clase = None
            if index_clase is not None:
                pj_dask[index] = pj_dask[index] + mi_delayed['pj'][mi_delayed['pred_classes'].tolist().index(clase)]
    return pj_dask

In [7]:
@dask.delayed
def gen_nzx_nzy_nzval_dask(chunks_mi_delayed_list: list):
    nzx_dask, nzy_dask, nz_val_dask = np.array([], dtype=np.int64),np.array([], dtype=np.int64),np.array([], dtype=np.int64)
    cross_clusters_list = []
    for mi_delayed in chunks_mi_delayed_list:
        true_nzx_np = np.array(list(map(lambda x: mi_delayed['true_classes'][x], mi_delayed['nzx'])))
        true_nzy_np = np.array(list(map(lambda x: mi_delayed['pred_classes'][x], mi_delayed['nzy'])))
        true_nz_val = mi_delayed['nz_val']
        cross_clusters_list.append(Counter(dict(list(zip(zip(true_nzx_np,true_nzy_np),true_nz_val)))))
    cross_clusters = dict(functools.reduce(lambda a,b : a+b,cross_clusters_list))
    for key in cross_clusters.keys():
        nzx_dask = np.append(nzx_dask, true_classes.tolist().index(key[0]))
        nzy_dask = np.append(nzy_dask, pred_classes.tolist().index(key[1]))
        nz_val_dask = np.append(nz_val_dask, cross_clusters[key])
    return (nzx_dask, nzy_dask, nz_val_dask)

In [14]:
@dask.delayed
def contingency_sum_dask(chunks_mi_delayed_list: list):
    suma = 0
    for mi_delayed in chunks_mi_delayed_list:
        suma = suma + mi_delayed['contingency_sum']
    return suma

In [None]:
@dask.delayed
def get_mi(chunks_mi: list, true_len: int, pred_len: int):
    pi = gen_pi_dask(delayes, 3)
    pj = gen_pj_dask(delayes, 2)
    nzx_nzy_nz_val = gen_nzx_nzy_nzval_dask(delayes)
    contingency_sum = contingency_sum(delayes)
    

In [9]:
datos_1 = partition_mutual_info_score([1,2,3], [4,5,4])
datos_2 = partition_mutual_info_score([3,2,1], [4,5,5])
delayes = [datos_1,datos_2]

In [10]:
pi_delayed = gen_pi_dask(delayes, 3)
pj_delayed = gen_pj_dask(delayes, 2)
nzx_nzy_nz_val_delayed = gen_nzx_nzy_nzval_dask(delayes)
contingency_sum_delayed = contingency_sum_dask(delayes)

In [11]:
pi = dask.compute(pi_delayed)[0]
pj = dask.compute(pj_delayed)[0]
nzx_nzy_nz_val = dask.compute(nzx_nzy_nz_val_delayed)[0]
contingency_sum = dask.compute(contingency_sum_delayed)[0]
nzx, nzy, nz_val = nzx_nzy_nz_val[0], nzx_nzy_nz_val[1], nzx_nzy_nz_val[2] 

In [12]:
contingency_sum

6

In [13]:
log_contingency_nm = np.log(nz_val)
print(log_contingency_nm)
contingency_nm = nz_val / contingency_sum
print(contingency_nm)
# Don't need to calculate the full outer product, just for non-zeroes
outer = pi.take(nzx).astype(np.int64) * pj.take(nzy).astype(np.int64)
print(pi.take(nzx).astype(np.int64))
print(outer)
print(pj.take(nzy).astype(np.int64))
log_outer = -np.log(outer) + log(sum(pi)) + log(sum(pj))
mi = (contingency_nm * (log_contingency_nm - log(contingency_sum)) +
          contingency_nm * log_outer)
print(mi.sum())

[0.         0.69314718 0.69314718 0.        ]
[0.16666667 0.33333333 0.33333333 0.16666667]
[2 2 2 2]
[6 6 6 6]
[3 3 3 3]
0.4620981203732969
