# Piecewise Linear Merge Tree Cophenetic Divergence


In [None]:
import os
import pickle
import numpy as np
from trajectories import generate_trajectories
from plots_and_correlates import plot_lce_estimate_and_correlation
from lca_supervised_learning import score_classification
from lca_supervised_learning import score_regression
from lca_supervised_learning import score_regression_pos
from TimeSeriesMergeTreeSimple import TimeSeriesMergeTree as TSMT
from ipyparallel import require


In [None]:
import ipyparallel as ipp
clients = ipp.Client()
dv = clients.direct_view()
lbv = clients.load_balanced_view()

In [None]:
SYS_NAMES = ["henon", "ikeda", "logistic", "tinkerbell"]
DIV_TYPES = ["dmt", "mt", "hvg", "ph"]
RES_TYPES = ["correlations", "divergences"]

In [None]:
SEED = 42
SAMPLES = 500
LENGTH = 250
experimental_data = generate_trajectories(
    RANDOM_SEED=SEED, TS_LENGTH=LENGTH, CONTROL_PARAM_SAMPLES=SAMPLES
)

In [None]:
def monotonize(ts):
    # forget intermediate non-critical points and equalize count of minima/maxima
    new_ts = [ts[0]]
    N = len(ts)
    for idx in range(1,N-1):
        x, y, z = ts[idx-1:idx+2]
        if (((x<y) and (z<y)) or ((x>y) and (z>y))):
            # add the local max/min
            new_ts.append(y)
    if (len(new_ts) % 2) == 1:
        new_ts.append(ts[-1])

    is_monotonic = lambda x: (np.all(x[::2]<x[1::2]) or np.all(x[::2]>x[1::2]))
    assert is_monotonic(new_ts), "new time series has non-critical values, somehow"
    return new_ts


In [None]:
logistic_trajectories = map(monotonize, experimental_data["logistic"]["trajectories"])
logistic_lces = experimental_data["logistic"]["lces"]
logistic_control_params = experimental_data["logistic"]["sys_params"]

In [None]:
henon_trajectories = map(monotonize, experimental_data["henon"]["trajectories"])
henon_lces = experimental_data["henon"]["lces"]
henon_control_params = experimental_data["henon"]["sys_params"]

In [None]:
ikeda_trajectories = map(monotonize, experimental_data["ikeda"]["trajectories"])
ikeda_lces = experimental_data["ikeda"]["lces"]
ikeda_control_params = experimental_data["ikeda"]["sys_params"]

In [None]:
tinkerbell_trajectories = map(monotonize, experimental_data["tinkerbell"]["trajectories"])
tinkerbell_lces = experimental_data["tinkerbell"]["lces"]
tinkerbell_control_params = experimental_data["tinkerbell"]["sys_params"]

## Build merge trees and compute divergences

In [None]:
@require(np)
def dict_of_arrays(list_of_dicts):
    """Convert list of dictionaries with equal keys to a dictionary of numpy arrays.
    
    Example
        Input
            [{'a': 1, 'b': 2}, {'a': 3, 'b': 4}]
        Output
            {'a': np.array([1, 3]), 'b': np.array([2, 4])}
    """
    return {key: np.array([d[key] for d in list_of_dicts]) for key in list_of_dicts[0]}

In [None]:
def topological_divergences(ts_representations):
    divergences = lbv.map_sync(lambda rep: rep.divergences, ts_representations)
    return dict_of_arrays(divergences)

In [None]:
def generate_plmt_estimates(
    sys_name,
    param_name,
    trajectories,
    control_params,
    actual_lces,
    show_plot=True,
):
    # store results to be returned
    correlations_and_scores = {}

    tsmts = map(TSMT, trajectories)
    divergences = topological_divergences(tsmts)
    for estimate_name, estimates in divergences.items():
        # estimate_name = f"pl_{estimate_name}"
        correlations_and_scores[
            estimate_name, sys_name, LENGTH
        ] = plot_lce_estimate_and_correlation(
            estimate_name,
            sys_name,
            param_name,
            estimates,
            actual_lces,
            control_params,
            LENGTH,
            show_plot=show_plot,
            save_plot=True,
            twoy=True,
            plot_actual=True,
        )
        correlations_and_scores[
            estimate_name, sys_name, LENGTH
        ] |= {
            "classification_f1": score_classification(estimates.reshape(-1,1), actual_lces),
            "regression_neg_mean_absolute": score_regression(estimates.reshape(-1,1), actual_lces),
            "pos_regression_neg_mean_absolute": score_regression_pos(estimates.reshape(-1,1), actual_lces)
        }

    return correlations_and_scores

In [None]:
all_results_plmt = {}
for sys_info in [
    ["Logistic", "r", logistic_trajectories, logistic_control_params, logistic_lces],
    ["Hénon", "a", henon_trajectories, henon_control_params, henon_lces],
    ["Tinkerbell", "a", tinkerbell_trajectories, tinkerbell_control_params, tinkerbell_lces],
    ["Ikeda", "a", ikeda_trajectories, ikeda_control_params, ikeda_lces],
]:
    all_results_plmt |= generate_plmt_estimates(*sys_info)

In [None]:
with open(f"outputs/data/PLMT_divergences_{LENGTH}.pkl", "wb") as file:
    pickle.dumps(all_results_plmt)


for result in all_results_plmt:
    print(result, all_results_plmt[result])