# Code for Dask Implementation of the correlation functions

# Imports

In [3]:
from dask import delayed
import pandas as pd
import dask.dataframe as dd
from dask.dataframe.core import aca
import scipy.stats as ss
import numpy as np
from collections import Counter
from multiprocessing import Pool, cpu_count

## Correlation Func Primitives

In [4]:
def conditional_entropy(x, y):
    """Calculates the conditional entropy of x given y: S(x|y)
    Wikipedia: https://en.wikipedia.org/wiki/Conditional_entropy

    Parameters
    ----------
    x : array-like
        A sequence of measurements.
    y : array-like
        A sequence of measurements.

    Returns
    -------
    float
        The total entropy of x given y

    Examples
    --------
    >>> np.random.seed(1)
    >>> x = np.random.randint(0,2, size=10)
    >>> y = np.random.randint(0,2, size=10)
    >>> conditional_entropy(x,y)
    0.606842558824411

    """
    y_counter = Counter(y)
    xy_counter = Counter(list(zip(x, y)))
    total_occurrences = sum(y_counter.values())
    p_xy = np.array([val for val in xy_counter.values()])/total_occurrences
    p_y = np.array([y_counter[xy[1]] for xy in xy_counter.keys()])/total_occurrences
    entropy = np.sum((p_xy * np.log(p_y/p_xy)))
    return entropy


def cramers_v(x, y):
    """Calculates Cramer's V statistic for categorical-categorical association.
    Uses correction from Bergsma and Wicher, Journal of the Korean Statistical Society 42 (2013): 323-328.
    This is a symmetric coefficient: V(x,y) = V(y,x)
    Original function taken from: https://stackoverflow.com/a/46498792/5863503
    Wikipedia: https://en.wikipedia.org/wiki/Cram%C3%A9r%27s_V

    Parameters
    ----------
    x : array-like
        A sequence of categorical measurements.
    y : array-like
        A sequence of categorical measurements.

    Returns
    -------
    float
        Coefficient in the range [0, 1].

    Examples
    --------
    >>> np.random.seed(1)
    >>> x = np.random.randint(0, 2, size=100)
    >>> y = x
    >>> cramers_v(x, y)
    0.9795896894087645

    """

    confusion_matrix = pd.crosstab(x, y)
    chi2 = ss.chi2_contingency(confusion_matrix)[0]
    n = confusion_matrix.sum().sum()
    phi2 = chi2/n
    r, k = confusion_matrix.shape
    phi2corr = max(0, phi2-((k-1)*(r-1))/(n-1))
    rcorr = r-((r-1)**2)/(n-1)
    kcorr = k-((k-1)**2)/(n-1)
    return np.sqrt(phi2corr/min((kcorr-1), (rcorr-1)))


def theils_u(x, y):
    """Calculates Theil's U statistic (Uncertainty coefficient) for categorical-categorical association.
    This is the uncertainty of x given y: value is on the range of [0,1] - where 0 means y provides no information about
    x, and 1 means y provides full information about x.
    Given the value of x, how many possible states does y have, and how often do they occur.
    This is an asymmetric coefficient: U(x,y) != U(y,x)
    Wikipedia: https://en.wikipedia.org/wiki/Uncertainty_coefficient

    Parameters
    ----------
    x : array-like
        A sequence of categorical measurements.
    y : array-like
        A sequence of categorical measurements.

    Returns
    -------
    float
        Coefficient in the range [0, 1].

    Examples
    --------
    >>> np.random.seed(1)
    >>> x = np.random.randint(0, 2, size=100)
    >>> y = x
    >>> theils_u(x, y)
    1.0

    """
    s_xy = conditional_entropy(x, y)
    x_counter = Counter(x)
    total_occurrences = sum(x_counter.values())
    p_x = list(map(lambda n: n/total_occurrences, x_counter.values()))
    s_x = ss.entropy(p_x)
    if s_x == 0:
        return 1
    else:
        return (s_x - s_xy) / s_x


def correlation_ratio(categories, measurements):
    """Calculates the Correlation Ratio (sometimes marked by the greek letter Eta) for categorical-continuous association.
    Answers the question - given a continuous value of a measurement, is it possible to know which category is it
    associated with?
    Value is in the range [0,1], where 0 means a category cannot be determined by a continuous measurement, and 1 means
    a category can be determined with absolute certainty.
    Wikipedia: https://en.wikipedia.org/wiki/Correlation_ratio

    Parameters
    ----------
    categories : array-like
        A sequence of categorical measurements.
    measurements : array-like
        A sequence of continuous measurements.

    Returns
    -------
    float
        Coefficient in the range [0, 1].

    Examples
    --------
    >>> np.random.seed(1)
    >>> categories = np.random.randint(0,2, size=100)
    >>> measurements = np.random.rand(100)
    >>> correlation_ratio(categories, measurements)
    0.042988734885557815

    """
    fcat, _ = pd.factorize(categories)
    cat_num = np.max(fcat)+1
    y_avg_array = np.zeros(cat_num)
    n_array = np.zeros(cat_num)
    for i in range(0, cat_num):
        cat_measures = measurements[np.argwhere(fcat == i).flatten()]
        n_array[i] = len(cat_measures)
        y_avg_array[i] = np.average(cat_measures)
    y_total_avg = np.sum(np.multiply(y_avg_array, n_array))/np.sum(n_array)
    numerator = np.sum(np.multiply(n_array, np.power(np.subtract(y_avg_array, y_total_avg), 2)))
    denominator = np.sum(np.power(np.subtract(measurements, y_total_avg), 2))
    if numerator == 0:
        eta = 0.0
    else:
        eta = numerator/denominator
    return eta

## Make a symmetrical Theils U with mproc

In [5]:
def theils_u_symmetrical(x, y):
    with Pool(cpu_count()) as pool:
        return np.mean(pool.starmap(theils_u, [(x, y), (y, x)]))

## Apply-Concat-Apply function for Dask Distributed

In [18]:
def dask_correlation_aca(corr_func, *args):
    return aca(
        args, 
        chunk=corr_func, # Function to apply to each chunk 
        aggregate=np.mean, # How to reduce results of function applied to each chunk
        combine=np.mean,
        meta=pd.Series([], dtype=float)
    )

## Load Test Data

In [20]:
data_path = '../../../data/flights_data/trip_logs.parquet'

In [21]:
df = dd.read_parquet(data_path, engine='pyarrow')

In [22]:
df = df.persist()

In [23]:
df.columns

Index(['trip_log_id', 'flight_id', 'date_scheduled', 'scheduled_dep_time',
       'scheduled_arr_time', 'dep_time', 'arr_time', 'dep_delay', 'taxi_out',
       'taxi_in', 'arr_delay', 'scheduled_elapsed_time', 'air_time',
       'distance', 'carrier_delay', 'weather_delay', 'national_airspace_delay',
       'security_delay', 'late_aircraft_delay', 'canceled', 'diverted'],
      dtype='object')

In [24]:
df

Unnamed: 0_level_0,trip_log_id,flight_id,date_scheduled,scheduled_dep_time,scheduled_arr_time,dep_time,arr_time,dep_delay,taxi_out,taxi_in,arr_delay,scheduled_elapsed_time,air_time,distance,carrier_delay,weather_delay,national_airspace_delay,security_delay,late_aircraft_delay,canceled,diverted
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
,int64,object,datetime64[ns],datetime64[ns],datetime64[ns],datetime64[ns],datetime64[ns],float64,float64,float64,float64,int64,float64,float64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


# Test all the funcs

### symmetrical theils_u

In [25]:
theils_u_symmetrical(df['canceled'].compute(), df['diverted'].compute())

0.0015895431338338387

### All the ACAs

In [26]:
dask_correlation_aca(cramers_v, df['canceled'], df['diverted']).compute()

0    0.007023
dtype: float64

In [27]:
dask_correlation_aca(theils_u, df['canceled'], df['diverted']).compute()

0    0.000576
dtype: float64

In [28]:
dask_correlation_aca(correlation_ratio, df['canceled'], df['carrier_delay']).compute()

0    0.000334
dtype: float64

In [29]:
dask_correlation_aca(theils_u_symmetrical, df['canceled'], df['diverted']).compute()

0    0.00159
dtype: float64