# Imports

In [None]:
import numpy as np
import pandas as pd

# TS Preprocessing

In [None]:
data = pd.read_csv("al/preprocessed_old/times_for_static_data.csv")
data = data.drop(["Place1", "Place2", "Activity"], axis=1)
data = data.set_index("DateTime")
data.drop(columns=data.columns[0], axis=1,  inplace=True)
ts_list = {}

for person in data.Person.unique():
    ts_list[person] = data[data["Person"] == person].drop(["Person"], axis=1)

# Dynamic Time Warping (DTW)

In [None]:
#test
from dtaidistance import dtw
df_array2 = np.random.random((4, 100))
dtw.distance_matrix_fast(df_array2)

In [None]:
from dtaidistance import dtw

In [None]:
from fastdtw import fastdtw
from scipy.spatial.distance import euclidean

num_features = len(ts_list[1].columns)
dist_matrix = np.zeros((num_features, num_features), dtype=np.float32)

# for i,feat1 in enumerate(ts_list[1].columns):
#     for j, feat2 in enumerate(ts_list[1].columns):
#         #distance, path = fastdtw(np.matrix(ts_list[1]["Sedentary"].values).T, ts_list[1][feat2].values, dist = euclidean)
#         distance, path = fastdtw(transpose[i,:].reshape(-1,1), transpose[j,:].reshape(-1,1), dist = euclidean)
#         dist_matrix[i,j] = distance

# Granger Causality

In [None]:
def create_internal_granger_matrix(ts: np.ndarray | pd.DataFrame, maxlag=3) -> np.ndarray | pd.DataFrame:
    from statsmodels.tsa.stattools import grangercausalitytests
    """
    Create a Granger causality matrix for all feature pairs within a single time series dataset.
    Each cell (i, j) in the matrix represents the Granger causality test result for feature i causing feature j.
    Reference: https://www.statsmodels.org/stable/generated/statsmodels.tsa.stattools.grangercausalitytests.html
    """
    if isinstance(ts, pd.DataFrame):
        columns = ts.columns
        ts = ts.to_numpy()
    else:
        columns = None

    num_features = ts.shape[1]
    causality_matrix = np.zeros((num_features, num_features), dtype=np.float32)

    for i in range(num_features):
        for j in range(num_features):
            if i != j:
                combined_data = np.column_stack((ts[:, i], ts[:, j]))
                result = grangercausalitytests(combined_data, maxlag=maxlag, verbose=False)
                # We are interested in any causality, so we take the minimum p-value over all lags up to maxlag
                # Options of 'ssr_chi2test' and 'params_ftest' are available. I'm not sure which is better.
                p_values = [result[lag][0]['ssr_chi2test'][1] for lag in range(1, maxlag + 1)]
                causality_matrix[i, j] = np.min(p_values)  # Choose the minimum p-value
    if columns is not None:
        return pd.DataFrame(causality_matrix, columns=columns, index=columns)
    else:
        return causality_matrix

In [None]:
def compare_time_by_granger_causality(df1: pd.DataFrame, df2: pd.DataFrame) -> float:
    # you may go crazy here (do whatever)
    matrix1 = create_internal_granger_matrix(df1)
    matrix2 = create_internal_granger_matrix(df2)
    # I chose KLD for fun. There's likely a better summary metric.
    return kld(matrix1, matrix2)

In [None]:
toy_ts = pd.DataFrame({"x": 1, "y": np.random.randn(20)})
create_internal_granger_matrix(toy_ts)

In [None]:
toy_ts = pd.DataFrame({"x": np.random.randn(20), "y": np.random.randn(20)})
create_internal_granger_matrix(toy_ts)

In [None]:
# Testing out removal of columns that contain only one value
ts1 = list(ts_list.values())[0]
df = ts1.loc[:, ts1.nunique() != 1]
#create_internal_granger_matrix(df)

In [None]:
n = len(ts_list)
gc_matrix = np.zeros((n, n), dtype=np.float32)

# Find Granger Causality Matrix
for i, person1 in enumerate(ts_list.keys()):
    for j, person2 in enumerate(ts_list.keys()):
        df1 = ts_list[person1][["SecondsCos","SecondsSin","DoyCos","DoySin","Sedentary"]]
        df2 = ts_list[person2][["SecondsCos","SecondsSin","DoyCos","DoySin","Sedentary"]]
        gc_matrix[i, j] = compare_time_by_granger_causality(df1,df2)


# KLD Comparison

In [None]:
def kld(p: pd.DataFrame, q: pd.DataFrame, bins=35) -> float:
    from scipy.stats import entropy
    """
    Compute the Kullback-Leibler Divergence using scipy's entropy function. KLD(p||q).
    This is an assymetric measure. KLD(p||q) can be understood as the amount of information lost when q is used to approximate p.
    So, a lower result implies q is a better approximation of p.
    If there's time, I'll partition PDFs by sine/cosine pairs and each categorical feature. 
    There are pros and cons to partitioning and this holistic approach.
    Don't worry about that though, just plop in dataframes and expect a float to return.
    """
    # Estimate PDFs
    p_hist = np.histogram(p.to_numpy(), bins, density=True)[0]
    q_hist = np.histogram(q.to_numpy(), bins, density=True)[0]

    # Avoid division by zero
    p_hist[p_hist == 0] = np.finfo(float).eps
    q_hist[q_hist == 0] = np.finfo(float).eps

    return entropy(p_hist, q_hist, base=2)

In [None]:
#Initialize matrix
n = len(ts_list)
kld_matrix = np.zeros((n, n), dtype=np.float32)

# Find KLD for each pair of homes and store in matrix
for i, person1 in enumerate(ts_list.keys()):
        for j, person2 in enumerate(ts_list.keys()):
            kld_matrix[i, j] = kld(ts_list[person1],ts_list[person2])

# Save results to file
#np.savetxt('results/Matrices/kld_matrix.csv', kld_matrix, delimiter=',')

# Mutual Information Score


In [None]:
from sklearn import metrics
#Initialize matrix
n = len(ts_list)
mis_matrix = np.zeros((n, n), dtype=np.float32)

# Find KLD for each pair of homes and store in matrix
for i, person1 in enumerate(ts_list.keys()):
        for j, person2 in enumerate(ts_list.keys()):
            mis_matrix[i, j] = metrics.mutual_info_score(ts_list[person1],ts_list[person2])

# Only allows for 1D inputs rather than full df; create internal matrix then summarize?

In [None]:
# Save results to file
#np.savetxt('results/Matrices/mis_matrix.csv', mis_matrix, delimiter=',')

# Earth Mover's Distance

In [None]:
import scipy

#Initialize matrix
n = len(ts_list)
emd_matrix = np.zeros((n, n), dtype=np.float32)

# Calculate & store
for i, person1 in enumerate(ts_list.keys()):
        for j, person2 in enumerate(ts_list.keys()):
            emd_matrix[i, j] = scipy.stats.wasserstein_distance(ts_list[person1],ts_list[person2])

# Only allows for 1D or 2D inputs rather than full df; create internal matrix then summarize?

# Cosine Similarity

In [None]:
n = len(ts_list)
cosSim_matrix = np.zeros((n, n), dtype=np.float32)

# Find KLD for each pair of homes and store in matrix
for i, person1 in enumerate(ts_list.keys()):
        for j, person2 in enumerate(ts_list.keys()):
            cosSim_matrix[i, j] = metrics.pairwise.cosine_similarity(ts_list[person1],ts_list[person2], dense_output=False)

# Save results to file
#np.savetxt('results/Matrices/kld_matrix.csv', kld_matrix, delimiter=',')