# Homework 1 (part I)
## Author: Mariusz Słapek
## Case study 1
## Date: 31.03.2020

This raport contains the results of research on repreduction of 3 articles about Python packages.

## Seglearn: A Python Package for Learning Sequences and Time Series

Links:
1. paper: http://www.jmlr.org/papers/volume19/18-160/18-160.pdf  
2. github: https://github.com/dmbee/seglearn  
3. webpage: https://dmbee.github.io/seglearn/  

### Information

*seglearn* is an open-source Python package for performing machine learning on time series or sequences. The implementation provides a flexible pipeline for tackling classification, regression, and forecasting problems with multivariate sequence and contextual data.


### Basic example

In [55]:
import seglearn as sgl
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler

data = sgl.load_watch()
X_train, X_test, y_train, y_test = train_test_split(data["X"], data["y"])

clf = sgl.Pype([("seg", sgl.SegmentX(width=100, overlap=0.5)),
                ("features", sgl.FeatureRep()),
                ("scaler", StandardScaler()),
                ("rf", RandomForestClassifier())])

clf.fit(X_train, y_train)
score = clf.score(X_test, y_test)
print("accuracy score:", score)

accuracy score: 0.7823878069432684


### Tests base

In [51]:
# Author: David Burns
# License: BSD

import warnings
warnings.filterwarnings('ignore')


import numpy as np
import pandas as pd

from seglearn.datasets import load_watch
from seglearn.base import TS_Data

def test_ts_data():
    # time series data
    ts = np.array([np.random.rand(100, 10), np.random.rand(200, 10), np.random.rand(20, 10)])
    c = np.random.rand(3, 10)
    data = TS_Data(ts, c)

    assert np.array_equal(data.context_data, c)
    assert np.all([np.array_equal(data.ts_data[i], ts[i]) for i in range(len(ts))])
    assert isinstance(data[1], TS_Data)
    assert np.array_equal(data[1].ts_data, ts[1])
    assert np.array_equal(data[1].context_data, c[1])

    # segmented time series data

    sts = np.random.rand(100, 10, 6)
    c = np.random.rand(100, 6)

    data = TS_Data(sts, c)
    assert isinstance(data[4:10], TS_Data)
    assert np.array_equal(data[4:10].ts_data, sts[4:10])
    assert np.array_equal(data[4:10].context_data, c[4:10])

    sts = np.random.rand(100, 10)
    c = np.random.rand(100)

    data = TS_Data(sts, c)
    assert isinstance(data[4:10], TS_Data)
    assert np.array_equal(data[4:10].ts_data, sts[4:10])
    assert np.array_equal(data[4:10].context_data, c[4:10])

def test_watch():
    df = load_watch()
    data = TS_Data(df['X'], df['side'])
    assert isinstance(data, TS_Data)


def test_pd():
    ts = np.array([np.random.rand(100, 10), np.random.rand(200, 10), np.random.rand(20, 10)])
    c = np.random.rand(3, 10)

    df = pd.DataFrame(c)
    df['ts_data'] = ts
    data = TS_Data.from_df(df)

    assert np.all([np.array_equal(data.ts_data[i], ts[i]) for i in range(len(ts))])
    assert np.array_equal(data.context_data, c)

    
test_ts_data()
test_watch()    
test_pd()

#### Error

In file requirements.txt we have only a info that a package *scipy* is necessary, but there is no version.

### Test feature functions

In [34]:
# Author: David Burns
# License: BSD

import numpy as np

from seglearn import feature_functions


def test_mv_feature_functions():
    ''' test feature functions with multivariate data '''

    # sliding window data is shape [n_segments, width, variables]
    N = 20
    W = 30
    mv_data = np.random.rand(N, W, 3)

    ftr_funcs = {}
    ftr_funcs.update(feature_functions.all_features())
    ftr_funcs.update(feature_functions.base_features())
    ftr_funcs.update(feature_functions.hudgins_features())
    ftr_funcs.update(feature_functions.emg_features())

    for f in ftr_funcs:
        mvf = ftr_funcs[f](mv_data)
        assert len(mvf) == N


def test_uv_feature_functions():
    ''' test feature functions with univariate data '''
    N = 20
    W = 30
    uv_data = np.random.rand(N, W)

    ftr_funcs = {}
    ftr_funcs.update(feature_functions.all_features())
    ftr_funcs.update(feature_functions.base_features())
    ftr_funcs.update(feature_functions.hudgins_features())
    ftr_funcs.update(feature_functions.emg_features())

    for f in ftr_funcs:
        uvf = ftr_funcs[f](uv_data)
        assert len(uvf) == N
        
test_mv_feature_functions()
test_uv_feature_functions()

AttributeError: module 'scipy.stats' has no attribute 'median_absolute_deviation'

### Test pipe

In [38]:
# Author: David Burns
# License: BSD

import numpy as np
import pandas as pd
from sklearn.linear_model import Ridge
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_validate

from seglearn.pipe import Pype
from seglearn.transform import FeatureRep, SegmentX, SegmentXY, SegmentXYForecast, PadTrunc
from seglearn.base import TS_Data


def yvals(y):
    if len(np.atleast_1d(y[0])) > 1:
        return np.unique(np.concatenate(y))
    else:
        return np.unique(y)


def transformation_test(clf, X, y):
    clf.fit(X, y)
    Xtr1, ytr1 = clf.transform(X, y)
    Xtr2, ytr2 = clf.fit_transform(X, y)
    assert np.all(Xtr1 == Xtr2)
    assert np.all(ytr1 == ytr2)
    assert np.all(np.isin(np.unique(ytr1), yvals(y)))
    assert len(Xtr1) == len(ytr1)


def test_pipe_transformation():
    # SegmentX transform pipe
    pipe = Pype([('seg', SegmentX()),
                 ('ftr', FeatureRep()),
                 ('scaler', StandardScaler())])
    Xt = [np.random.rand(1000, 10), np.random.rand(100, 10), np.random.rand(500, 10)]
    Xc = np.random.rand(3, 3)
    X = TS_Data(Xt, Xc)
    y = [1, 2, 3]
    transformation_test(pipe, X, y)

    X = pd.DataFrame(Xc)
    X['ts_data'] = Xt
    X = TS_Data.from_df(X)
    transformation_test(pipe, X, y)

    # SegmentXY transform pipe
    pipe = Pype([('seg', SegmentXY()),
                 ('ftr', FeatureRep()),
                 ('scaler', StandardScaler())])
    Xt = [np.random.rand(1000, 10), np.random.rand(100, 10), np.random.rand(500, 10)]
    Xc = np.random.rand(3, 3)
    X = TS_Data(Xt, Xc)
    y = [np.random.rand(1000), np.random.rand(100), np.random.rand(500)]
    transformation_test(pipe, X, y)

    X = pd.DataFrame(Xc)
    X['ts_data'] = Xt
    X = TS_Data.from_df(X)
    transformation_test(pipe, X, y)

    # Forecast transform pipe
    pipe = Pype([('seg', SegmentXYForecast()),
                 ('ftr', FeatureRep()),
                 ('scaler', StandardScaler())])
    Xt = [np.random.rand(1000, 10), np.random.rand(100, 10), np.random.rand(500, 10)]
    Xc = np.random.rand(3, 3)
    X = TS_Data(Xt, Xc)
    y = [np.random.rand(1000), np.random.rand(100), np.random.rand(500)]
    transformation_test(pipe, X, y)

    X = pd.DataFrame(Xc)
    X['ts_data'] = Xt
    X = TS_Data.from_df(X)
    transformation_test(pipe, X, y)

    # Padtrunc transform pipe
    pipe = Pype([('trunc', PadTrunc()),
                 ('ftr', FeatureRep()),
                 ('scaler', StandardScaler())])
    Xt = [np.random.rand(1000, 10), np.random.rand(100, 10), np.random.rand(500, 10)]
    Xc = np.random.rand(3, 3)
    X = TS_Data(Xt, Xc)
    y = [1, 2, 3]
    transformation_test(pipe, X, y)

    X = pd.DataFrame(Xc)
    X['ts_data'] = Xt
    X = TS_Data.from_df(X)
    transformation_test(pipe, X, y)


def classifier_test(clf, X, y):
    yv = yvals(y)
    clf.fit(X, y)
    yp = clf.predict(X)
    ytr, yp2 = clf.transform_predict(X, y)
    assert np.all(np.isin(np.unique(ytr), yv))
    assert len(ytr) == len(yp2)
    assert np.all(np.isin(np.unique(yp2), yv))
    assert np.all(yp == yp2)
    pp = clf.predict_proba(X)
    assert pp.shape[0] == len(yp)
    assert pp.shape[1] == len(yv)
    score = clf.score(X, y)
    assert score <= 1.0 and score >= 0.0

    if clf._get_segmenter():
        s = clf.predict_segmented_series(X, categorical_target=True)
        for i in np.arange(len(X)):
            assert len(X[i]) == len(s[i])
            assert np.all(np.isin(np.unique(s[i]), yv))


def test_pipe_classification():
    # no context data, single time series
    X = [np.random.rand(1000, 10)]
    y = [5]

    pipe = Pype([('seg', SegmentX()),
                 ('ftr', FeatureRep()),
                 ('rf', RandomForestClassifier(n_estimators=10))])

    classifier_test(pipe, X, y)

    # context data, single time seres
    Xt = [np.random.rand(1000, 10)]
    Xc = [np.random.rand(3)]
    X = TS_Data(Xt, Xc)
    y = [5]
    classifier_test(pipe, X, y)

    X = pd.DataFrame(Xc)
    X['ts_data'] = Xt
    X = TS_Data.from_df(X)
    classifier_test(pipe, X, y)

    # multiple time series
    Xt = [np.random.rand(1000, 10), np.random.rand(100, 10), np.random.rand(500, 10)]
    Xc = np.random.rand(3, 3)
    X = TS_Data(Xt, Xc)
    y = [1, 2, 3]
    classifier_test(pipe, X, y)

    X = pd.DataFrame(Xc)
    X['ts_data'] = Xt
    X = TS_Data.from_df(X)
    classifier_test(pipe, X, y)

    # univariate data
    Xt = [np.random.rand(1000), np.random.rand(100), np.random.rand(500)]
    Xc = np.random.rand(3)
    X = TS_Data(Xt, Xc)
    y = [1, 2, 3]
    classifier_test(pipe, X, y)

    X = pd.DataFrame(Xc)
    X['ts_data'] = Xt
    X = TS_Data.from_df(X)
    classifier_test(pipe, X, y)


def regression_test(clf, X, y):
    yv = yvals(y)
    clf.fit(X, y)
    yp = clf.predict(X)
    ytr, yp2 = clf.transform_predict(X, y)
    assert np.all(np.isin(np.unique(ytr), yv))
    assert len(ytr) == len(yp2)
    assert np.all(yp == yp2)
    score = clf.score(X, y)
    assert score <= 1.0 and score >= 0.0

    if clf._get_segmenter():
        s = clf.predict_segmented_series(X, categorical_target=False)
        for i in np.arange(len(X)):
            assert len(X[i]) == len(s[i])
            assert np.max(yp) >= np.max(s[i])
            assert np.min(yp) <= np.min(s[i])


def test_pipe_regression():
    # no context data, single time series
    X = [np.random.rand(1000, 10)]
    y = [np.random.rand(1000)]
    pipe = Pype([('seg', SegmentXY()),
                 ('ftr', FeatureRep()),
                 ('ridge', Ridge())])
    regression_test(pipe, X, y)

    # context data, single time seres
    Xt = [np.random.rand(1000, 10)]
    Xc = [np.random.rand(3)]
    X = TS_Data(Xt, Xc)
    y = [np.random.rand(1000)]
    regression_test(pipe, X, y)

    X = pd.DataFrame(Xc)
    X['ts_data'] = Xt
    X = TS_Data.from_df(X)
    regression_test(pipe, X, y)

    # multiple time seres
    Xt = [np.random.rand(1000, 10), np.random.rand(100, 10), np.random.rand(500, 10)]
    Xc = np.random.rand(3, 3)
    X = TS_Data(Xt, Xc)
    y = [np.random.rand(1000), np.random.rand(100), np.random.rand(500)]
    regression_test(pipe, X, y)

    X = pd.DataFrame(Xc)
    X['ts_data'] = Xt
    X = TS_Data.from_df(X)
    regression_test(pipe, X, y)

    # cross val
    Xt = np.array([np.random.rand(1000, 10)] * 5)
    Xc = np.random.rand(5, 3)
    X = TS_Data(Xt, Xc)
    y = np.array([np.random.rand(1000)] * 5)
    cross_validate(pipe, X, y, cv=3)

    X = pd.DataFrame(Xc)
    Xt = [np.random.rand(1000, 10)] * 5
    X['ts_data'] = Xt
    X = TS_Data.from_df(X)

    cross_validate(pipe, X, y, cv=3)


def forecast_test(clf, X, y):
    yv = yvals(y)
    clf.fit(X, y)
    yp = clf.predict(X)
    ytr, yp2 = clf.transform_predict(X, y)
    assert np.all(np.isin(np.unique(ytr), yv))
    assert len(ytr) == len(yp2)
    assert np.all(yp == yp2)
    score = clf.score(X, y)
    assert score <= 1.0 and score >= 0.0


def test_pipe_forecast():
    # no context data, single time series
    X = [np.random.rand(1000, 10)]
    y = [np.random.rand(1000)]

    pipe = Pype([('seg', SegmentXYForecast()),
                 ('ftr', FeatureRep()),
                 ('ridge', Ridge())])

    forecast_test(pipe, X, y)

    # context data, single time seres
    Xt = [np.random.rand(1000, 10)]
    Xc = [np.random.rand(3)]
    X = TS_Data(Xt, Xc)
    y = [np.random.rand(1000)]

    forecast_test(pipe, X, y)

    X = pd.DataFrame(Xc)
    X['ts_data'] = Xt
    X = TS_Data.from_df(X)
    forecast_test(pipe, X, y)

    # multiple time seres
    Xt = [np.random.rand(1000, 10), np.random.rand(100, 10), np.random.rand(500, 10)]
    Xc = np.random.rand(3, 3)
    X = TS_Data(Xt, Xc)
    y = [np.random.rand(1000), np.random.rand(100), np.random.rand(500)]

    forecast_test(pipe, X, y)

    X = pd.DataFrame(Xc)
    X['ts_data'] = Xt
    X = TS_Data.from_df(X)
    forecast_test(pipe, X, y)

    # cross val

    Xt = np.array([np.random.rand(1000, 10)] * 5)
    Xc = np.random.rand(5, 3)
    X = TS_Data(Xt, Xc)
    y = np.array([np.random.rand(1000)] * 5)

    cross_validate(pipe, X, y, cv=3)

    X = pd.DataFrame(Xc)
    Xt = [np.random.rand(1000, 10)] * 5
    X['ts_data'] = Xt
    X = TS_Data.from_df(X)
    cross_validate(pipe, X, y, cv=3)


def test_pipe_PadTrunc():
    # no context data, single time series
    X = [np.random.rand(1000, 10)]
    y = [5]
    pipe = Pype([('trunc', PadTrunc()),
                 ('ftr', FeatureRep()),
                 ('rf', RandomForestClassifier(n_estimators=10))])
    classifier_test(pipe, X, y)

    # context data, single time seres
    Xt = [np.random.rand(1000, 10)]
    Xc = [np.random.rand(3)]
    X = TS_Data(Xt, Xc)
    y = [5]
    classifier_test(pipe, X, y)

    X = pd.DataFrame(Xc)
    X['ts_data'] = Xt
    classifier_test(pipe, X, y)

    # multiple time series
    Xt = [np.random.rand(1000, 10), np.random.rand(100, 10), np.random.rand(500, 10)]
    Xc = np.random.rand(3, 3)
    X = TS_Data(Xt, Xc)
    y = [1, 2, 3]
    classifier_test(pipe, X, y)

    X = pd.DataFrame(Xc)
    X['ts_data'] = Xt
    classifier_test(pipe, X, y)

    # univariate data
    Xt = [np.random.rand(1000), np.random.rand(100), np.random.rand(500)]
    Xc = np.random.rand(3)
    X = TS_Data(Xt, Xc)
    y = [1, 2, 3]
    classifier_test(pipe, X, y)

    X = pd.DataFrame(Xc)
    X['ts_data'] = Xt
    X = TS_Data.from_df(X)
    classifier_test(pipe, X, y)
    
test_pipe_transformation()
test_pipe_classification()
test_pipe_regression()
test_pipe_forecast()
test_pipe_PadTrunc()

### Test preprocessing

In [39]:
# Author: David Burns
# License: BSD

import numpy as np
import pandas as pd

from seglearn.preprocessing import TargetRunLengthEncoder
from seglearn.base import TS_Data

from seglearn.util import get_ts_data_parts

def test_trle():

    # Multivariate data
    Nt = 100
    nvars = 5
    X = [np.random.rand(Nt, nvars)]
    y = [np.concatenate([np.full(3, 1), np.full(26, 2), np.full(1, 3), np.full(70, 4)])]

    rle = TargetRunLengthEncoder(min_length=5)
    rle.fit(X)
    Xt, yt, _ = rle.transform(X, y)

    assert len(Xt) == len(yt) and len(yt) == 2
    assert yt[0] == 2 and yt[1] == 4
    assert len(Xt[0]) == 26 and len(Xt[1]) == 70

    # Nothing excluded
    Nt = 100
    nvars = 5
    X = [np.random.rand(Nt, nvars)]
    y = [np.concatenate([np.full(50,1), np.full(50,2)])]

    rle = TargetRunLengthEncoder(min_length=5)
    rle.fit(X, y)
    Xt, yt, _ = rle.transform(X, y)

    assert len(Xt) == len(yt) and len(yt) == 2
    assert np.all(np.concatenate(Xt) == X)
    assert yt[0] == 1 and yt[1] == 2
    assert len(Xt[0]) == 50 and len(Xt[1]) == 50

    # Univariate data with sample weight and context
    Nt = 100
    Xts = [np.random.rand(Nt)]
    Xc = [5]
    X = TS_Data(Xts, Xc)
    y = [np.concatenate([np.full(3, 1), np.full(26, 2), np.full(1, 3), np.full(70, 4)])]
    sw = [1]

    rle = TargetRunLengthEncoder(min_length=5)
    rle.fit(X)
    Xt, yt, swt = rle.transform(X, y, sw)
    Xtc = Xt.context_data

    assert len(Xt) == len(yt) and len(swt) == len(yt) and len(yt) == 2
    assert yt[0] == 2 and yt[1] == 4
    assert len(Xt[0]) == 26 and len(Xt[1]) == 70
    assert swt[0] == 1 and swt[1] == 1
    assert Xtc[0] == 5 and Xtc[1] == 5

    X = pd.DataFrame(Xc)
    X['ts_data'] = Xts
    X = TS_Data.from_df(X)

    rle = TargetRunLengthEncoder(min_length=5)
    rle.fit(X)
    Xt, yt, swt = rle.transform(X, y, sw)
    Xtc = Xt.context_data

    assert len(Xt) == len(yt) and len(swt) == len(yt) and len(yt) == 2
    assert yt[0] == 2 and yt[1] == 4
    assert len(Xt[0]) == 26 and len(Xt[1]) == 70
    assert swt[0] == 1 and swt[1] == 1
    assert Xtc[0] == 5 and Xtc[1] == 5

    
test_trle()

### Test split

In [41]:
# Author: David Burns
# License: BSD

from numpy.random import rand
import numpy as np

from seglearn.split import TemporalKFold, temporal_split
from seglearn.base import TS_Data


def test_temporal_split():
    # test with length 1 series
    X = [rand(100, 10)]
    y = [5]
    Xtr, Xte, ytr, yte = temporal_split(X, y)
    check_split(X, Xtr, Xte, y, ytr, yte)

    X = [rand(100, 10)]
    y = [rand(100)]
    Xtr, Xte, ytr, yte = temporal_split(X, y)
    check_split(X, Xtr, Xte, y, ytr, yte)

    Xt = [rand(100, 10)]
    Xc = [5]
    X = TS_Data(Xt, Xc)
    y = [rand(100)]
    Xtr, Xte, ytr, yte = temporal_split(X, y)
    check_split(X, Xtr, Xte, y, ytr, yte)

    # test with lots of series
    Ns = 5
    X = np.array([rand(100, 10)] * Ns)
    y = rand(Ns)
    Xtr, Xte, ytr, yte = temporal_split(X, y)
    check_split(X, Xtr, Xte, y, ytr, yte)

    X = np.array([rand(100, 10)] * Ns)
    y = np.array([rand(100)] * Ns)
    Xtr, Xte, ytr, yte = temporal_split(X, y)
    check_split(X, Xtr, Xte, y, ytr, yte)

    Xt = np.array([rand(100, 10)] * Ns)
    Xc = rand(Ns)
    X = TS_Data(Xt, Xc)
    y = np.arange(Ns)
    Xtr, Xte, ytr, yte = temporal_split(X, y)
    check_split(X, Xtr, Xte, y, ytr, yte)


def test_temporal_k_fold():
    # test length 1 series
    splitter = TemporalKFold()
    X = [rand(100, 10)]
    y = [5]
    Xs, ys, cv = splitter.split(X, y)
    check_folds(Xs, ys, cv)

    X = [rand(100, 10)]
    y = [rand(100)]
    Xs, ys, cv = splitter.split(X, y)
    check_folds(Xs, ys, cv)

    Xt = [rand(100, 10)]
    Xc = [5]
    X = TS_Data(Xt, Xc)
    y = [rand(100)]
    Xs, ys, cv = splitter.split(X, y)
    check_folds(Xs, ys, cv)

    # test with lots of series
    splitter = TemporalKFold()
    Ns = 5
    X = np.array([rand(100, 10)] * Ns)
    y = rand(Ns)
    Xs, ys, cv = splitter.split(X, y)
    check_folds(Xs, ys, cv)

    X = np.array([rand(100, 10)] * Ns)
    y = np.array([rand(100)] * Ns)
    Xs, ys, cv = splitter.split(X, y)
    check_folds(Xs, ys, cv)

    Xt = np.array([rand(100, 10)] * Ns)
    Xc = rand(Ns)
    X = TS_Data(Xt, Xc)
    y = np.array([rand(100)] * Ns)
    Xs, ys, cv = splitter.split(X, y)
    check_folds(Xs, ys, cv)

    Xt = np.array([rand(100, 10)] * Ns)
    Xc = rand(Ns)
    X = TS_Data(Xt, Xc)
    y = rand(Ns)
    Xs, ys, cv = splitter.split(X, y)
    check_folds(Xs, ys, cv)


def check_ts_var(X, Xtr, Xte):
    assert np.all([np.array_equal(np.concatenate((Xtr[i], Xte[i])), X[i]) for i in range(len(X))])


def check_static_var(y, ytr, yte):
    assert np.array_equal(np.array(y), np.array(ytr))
    assert np.array_equal(np.array(y), np.array(yte))


def check_split(X, Xtr, Xte, y, ytr, yte):
    assert len(Xtr) == len(ytr)
    assert len(Xte) == len(yte)

    if isinstance(X, TS_Data):
        assert isinstance(Xtr, TS_Data)
        assert isinstance(Xte, TS_Data)
        Xt = X.ts_data
        Xtrt = Xtr.ts_data
        Xtet = Xte.ts_data
        Xc = X.context_data
        Xtrc = Xtr.context_data
        Xtec = Xte.context_data
        check_static_var(Xc, Xtrc, Xtec)
        check_ts_var(Xt, Xtrt, Xtet)
    else:
        check_ts_var(X, Xtr, Xte)

    if len(np.atleast_1d(y[0])) > 1:
        check_ts_var(y, ytr, yte)
    else:
        check_static_var(y, ytr, yte)


def check_folds(Xs, ys, cv):
    idj = []
    for i in range(len(cv)):
        assert len(Xs[cv[i][0]]) == len(ys[cv[i][0]])
        assert len(Xs[cv[i][1]]) == len(ys[cv[i][1]])
        idi = np.concatenate((cv[i][0], cv[i][1]))
        assert np.array_equal(np.sort(idi), np.arange(len(idi)))  # checks each value in fold
        idj.append(cv[i][1])
    idj = np.concatenate(idj)
    assert np.array_equal(np.sort(idj), np.arange(len(idj)))  # checks each value tested once
    
    
test_temporal_split()
test_temporal_k_fold()

### Test transform 

In [46]:
# Author: David Burns
# License: BSD

import pytest
import warnings
import pickle

import numpy as np

import seglearn.transform as transform
from seglearn.base import TS_Data
from seglearn.feature_functions import all_features, mean
from seglearn.util import get_ts_data_parts
from sklearn.utils import shuffle
from sklearn.base import BaseEstimator


def test_sliding_window():
    N = 1000
    width = 10
    ts = np.random.rand(N)
    for step in 1 + np.arange(width):
        sts = transform.sliding_window(ts, width, step)
        sts_c = transform.sliding_window(ts, width, step, 'C')
        assert sts.flags.f_contiguous and sts_c.flags.c_contiguous
        assert sts.shape[1] == width and sts_c.shape[1] == width
        Nsts = 1 + (N - width) // step
        assert Nsts == sts.shape[0] and Nsts == sts_c.shape[0]
        assert np.all(np.isin(sts, ts)) and np.all(np.isin(sts_c, ts))

        # reconstruct the ts
        if step == 1:
            assert np.array_equal(np.concatenate((sts[:, 0], sts[-1, 1:width])), ts)
            assert np.array_equal(np.concatenate((sts_c[:, 0], sts_c[-1, 1:width])), ts)

        if step == width:
            assert np.array_equal(sts.ravel(), ts)
            assert np.array_equal(sts_c.ravel(), ts)


def test_sliding_tensor():
    N = 1000
    V = 5
    width = 10
    ts = np.random.rand(N, V)
    for step in 1 + np.arange(width):
        sts = transform.sliding_tensor(ts, width, step)
        assert sts.shape[1] == width
        assert sts.shape[2] == V
        Nsts = 1 + (N - width) // step
        assert Nsts == sts.shape[0]
        for j in range(V):
            assert np.all(np.isin(sts[:, :, j], ts[:, j]))

        # todo: reconstruct tensor ts

    final_tensor = []
    for step in 1 + np.arange(width):
        sts = transform.sliding_tensor(ts, width, step, 'C')
        final_tensor.append(sts)
        assert sts.flags.c_contiguous
        assert sts.shape[1] == width
        assert sts.shape[2] == V
        Nsts = 1 + (N - width) // step
        assert Nsts == sts.shape[0]
        for j in range(V):
            assert np.all(np.isin(sts[:, :, j], ts[:, j]))
    assert np.concatenate(final_tensor).flags.c_contiguous

def test_feature_rep():
    # multivariate ts
    frep = transform.FeatureRep(features=all_features())
    X = np.random.rand(100, 10, 5)
    y = np.ones(100)
    frep.fit(X, y)
    Xt = frep.transform(X)
    assert Xt.shape[0] == len(X)
    assert len(frep.f_labels) == Xt.shape[1]

    # univariate ts
    X = np.random.rand(100, 10)
    y = np.ones(100)
    frep.fit(X, y)
    Xt = frep.transform(X)
    assert Xt.shape[0] == len(X)
    assert len(frep.f_labels) == Xt.shape[1]

    # single feature
    frep = transform.FeatureRep(features={'mean': mean})
    frep.fit(X, y)
    Xt = frep.transform(X)
    assert Xt.shape[0] == len(X)
    assert len(frep.f_labels) == Xt.shape[1]
    assert Xt.shape[1] == 1

    # ts with multivariate contextual data
    frep = transform.FeatureRep(features=all_features())
    X = TS_Data(np.random.rand(100, 10, 5), np.random.rand(100, 3))
    y = np.ones(100)
    frep.fit(X, y)
    Xt = frep.transform(X)
    assert Xt.shape[0] == len(X)
    assert len(frep.f_labels) == Xt.shape[1]

    # ts with univariate contextual data
    X = TS_Data(np.random.rand(100, 10, 5), np.random.rand(100))
    y = np.ones(100)
    frep.fit(X, y)
    Xt = frep.transform(X)
    assert Xt.shape[0] == len(X)
    assert len(frep.f_labels) == Xt.shape[1]


def test_segmentx():
    # test illegal parameter settings
    with pytest.raises(ValueError):
        transform.SegmentX(width=0)                  # illegal width value
    with pytest.raises(ValueError):
        transform.SegmentX(overlap=None, step=None)  # either overlap or step must be defined
    with pytest.raises(ValueError):
        transform.SegmentX(overlap=-1, step=None)    # illegal overlap value
    with pytest.raises(ValueError):
        transform.SegmentX(step=0)                   # illegal step value
    with pytest.raises(ValueError):
        transform.SegmentX(order=None)               # illegal order

    # test _step property working as expected
    seg = transform.SegmentX(width=10, overlap=0.5)
    assert seg._step == 5

    # test precedence of step over overlap
    seg = transform.SegmentX(width=10, overlap=1, step=1)
    assert seg._step == 1

    # illegal overlap value, but valid step value
    seg = transform.SegmentX(overlap=-1, step=1)
    assert seg._step == 1

    # test shape of segmented data
    width = 5
    nvars = 5
    seg = transform.SegmentX(width=width)

    # multivariate ts data without context data
    X = [np.random.rand(100, nvars), np.random.rand(100, nvars), np.random.rand(100, nvars)]
    y = np.random.rand(3)
    seg.fit(X, y)
    Xs, ys, _ = seg.transform(X, y)
    N = len(ys)
    assert Xs.shape == (N, width, nvars)

    # univariate ts data without context
    X = [np.random.rand(100), np.random.rand(100), np.random.rand(100)]
    y = np.random.rand(3)
    seg.fit(X, y)
    Xs, ys, _ = seg.transform(X, y)
    N = len(ys)
    assert Xs.shape == (N, width)

    # multivariate ts data with context data
    Xt = [np.random.rand(100, nvars), np.random.rand(200, nvars), np.random.rand(50, nvars)]
    Xc = np.random.rand(3, 4)
    y = np.random.rand(3)
    X = TS_Data(Xt, Xc)
    seg.fit(X, y)
    Xs, ys, _ = seg.transform(X, y)
    Xst, Xsc = get_ts_data_parts(Xs)
    N = len(ys)
    assert Xst.shape == (N, width, nvars)
    assert Xsc.shape == (N, 4)

    # ts data with univariate context data
    Xt = [np.random.rand(100), np.random.rand(200), np.random.rand(50)]
    Xc = np.random.rand(3)
    y = np.random.rand(3)
    X = TS_Data(Xt, Xc)
    seg.fit(X, y)
    Xs, ys, _ = seg.transform(X, y)
    Xst, Xsc = get_ts_data_parts(Xs)
    N = len(ys)
    assert Xst.shape == (N, width)
    assert Xsc.shape == (N, 1)

    # same number as context vars and time vars
    # this would cause broadcasting failure before implementation of TS_Data class
    Xt = [np.random.rand(100, nvars), np.random.rand(200, nvars), np.random.rand(50, nvars)]
    Xc = np.random.rand(3, nvars)
    y = np.random.rand(3)
    X = TS_Data(Xt, Xc)
    seg.fit(X, y)
    Xs, ys, _ = seg.transform(X, y)
    Xst, Xsc = get_ts_data_parts(Xs)
    N = len(ys)
    assert Xst.shape == (N, width, nvars)
    assert Xsc.shape == (N, 5)


def test_segmentxy():
    # test illegal parameter settings
    with pytest.raises(ValueError):
        transform.SegmentXY(width=0)                  # illegal width value
    with pytest.raises(ValueError):
        transform.SegmentXY(overlap=None, step=None)  # either overlap or step must be defined
    with pytest.raises(ValueError):
        transform.SegmentXY(overlap=-1, step=None)    # illegal overlap value
    with pytest.raises(ValueError):
        transform.SegmentXY(step=0)                   # illegal step value
    with pytest.raises(ValueError):
        transform.SegmentXY(order=None)               # illegal order

    # test _step property working as expected
    seg = transform.SegmentXY(width=10, overlap=0.5)
    assert seg._step == 5

    # test precedence of step over overlap
    seg = transform.SegmentXY(width=10, overlap=1, step=1)
    assert seg._step == 1

    # illegal overlap value, but valid step value
    seg = transform.SegmentXY(overlap=-1, step=1)
    assert seg._step == 1

    # test shape of segmented data
    Nt = 100
    width = 5
    nvars = 5
    seg = transform.SegmentXY(width=width)

    # multivariate ts data without context data
    X = [np.random.rand(Nt, nvars), np.random.rand(Nt, nvars), np.random.rand(Nt, nvars)]
    y = [np.random.rand(Nt), np.random.rand(Nt), np.random.rand(Nt)]
    seg.fit(X, y)
    Xs, ys, _ = seg.transform(X, y)
    N = len(ys)
    assert Xs.shape == (N, width, nvars)

    # univariate ts data without context data
    X = [np.random.rand(Nt), np.random.rand(2 * Nt), np.random.rand(3 * Nt)]
    y = [np.random.rand(Nt), np.random.rand(2 * Nt), np.random.rand(3 * Nt)]
    seg.fit(X, y)
    Xs, ys, _ = seg.transform(X, y)
    N = len(ys)
    assert Xs.shape == (N, width)

    # multivariate ts data with context data
    Xt = [np.random.rand(Nt, nvars), np.random.rand(2 * Nt, nvars), np.random.rand(Nt, nvars)]
    Xc = np.random.rand(3, 4)
    y = [np.random.rand(Nt), np.random.rand(2 * Nt), np.random.rand(Nt)]
    X = TS_Data(Xt, Xc)
    seg.fit(X, y)
    Xs, ys, _ = seg.transform(X, y)
    Xst, Xsc = get_ts_data_parts(Xs)
    N = len(ys)
    assert Xst.shape == (N, width, nvars)
    assert Xsc.shape == (N, 4)

    # ts data with univariate context data
    Xt = [np.random.rand(Nt, nvars), np.random.rand(2 * Nt, nvars), np.random.rand(Nt, nvars)]
    Xc = np.random.rand(3)
    y = [np.random.rand(Nt), np.random.rand(2 * Nt), np.random.rand(Nt)]
    X = TS_Data(Xt, Xc)
    seg.fit(X, y)
    Xs, ys, _ = seg.transform(X, y)
    Xst, Xsc = get_ts_data_parts(Xs)
    N = len(ys)
    assert Xst.shape == (N, width, nvars)
    assert Xsc.shape == (N, 1)

    # same number as context vars and time vars
    # this would cause broadcasting failure before implementation of TS_Data class
    Xt = [np.random.rand(Nt, nvars), np.random.rand(2 * Nt, nvars), np.random.rand(Nt, nvars)]
    Xc = np.random.rand(3, nvars)
    y = [np.random.rand(Nt), np.random.rand(2 * Nt), np.random.rand(Nt)]
    X = TS_Data(Xt, Xc)
    seg.fit(X, y)
    Xs, ys, _ = seg.transform(X, y)
    Xst, Xsc = get_ts_data_parts(Xs)
    N = len(ys)
    assert Xst.shape == (N, width, nvars)
    assert Xsc.shape == (N, 5)


def test_segmentxyforecast():
    # test illegal parameter settings
    with pytest.raises(ValueError):
        transform.SegmentXYForecast(width=0)                  # illegal width value
    with pytest.raises(ValueError):
        transform.SegmentXYForecast(overlap=None, step=None)  # either overlap or step must be defined
    with pytest.raises(ValueError):
        transform.SegmentXYForecast(overlap=-1, step=None)    # illegal overlap value
    with pytest.raises(ValueError):
        transform.SegmentXYForecast(step=0)                   # illegal step value
    with pytest.raises(ValueError):
        transform.SegmentXYForecast(order=None)               # illegal order
    with pytest.raises(ValueError):
        transform.SegmentXYForecast(forecast=0)               # illegal forecast value

    # test _step property working as expected
    seg = transform.SegmentXYForecast(width=10, overlap=0.5)
    assert seg._step == 5

    # test precedence of step over overlap
    seg = transform.SegmentXYForecast(width=10, overlap=1, step=1)
    assert seg._step == 1

    # illegal overlap value, but valid step value
    seg = transform.SegmentXYForecast(overlap=-1, step=1)
    assert seg._step == 1

    # test shape of segmented data
    Nt = 100
    width = 5
    nvars = 5

    # lets do a forecast test
    seg = transform.SegmentXYForecast(width=width, forecast=5)
    Xt = [np.random.rand(Nt, nvars), np.random.rand(2 * Nt, nvars), np.random.rand(Nt, nvars)]
    Xc = np.random.rand(3, 4)
    y = [np.random.rand(Nt), np.random.rand(2 * Nt), np.random.rand(Nt)]
    X = TS_Data(Xt, Xc)
    seg.fit(X, y)
    Xs, ys, _ = seg.transform(X, y)
    Xst, Xsc = get_ts_data_parts(Xs)
    N = len(ys)
    assert Xst.shape == (N, width, nvars)
    assert Xsc.shape == (N, 4)

    # univariate X
    nvars = 1
    seg = transform.SegmentXYForecast(width=width, forecast=5)
    X = [np.random.rand(Nt), np.random.rand(2 * Nt), np.random.rand(Nt)]
    y = [np.random.rand(Nt), np.random.rand(2 * Nt), np.random.rand(Nt)]
    seg.fit(X, y)
    Xs, ys, _ = seg.transform(X, y)
    Xst, Xsc = get_ts_data_parts(Xs)
    N = len(ys)
    assert Xst.shape == (N, width)


def test_pad_trunc():
    Nt = 100
    width = 5
    nvars = 5
    seg = transform.PadTrunc(width=width)

    # multivariate ts data without context data
    X = [np.random.rand(Nt, nvars), np.random.rand(Nt, nvars), np.random.rand(Nt, nvars)]
    y = [np.random.rand(Nt), np.random.rand(Nt), np.random.rand(Nt)]
    seg.fit(X, y)
    Xs, ys, _ = seg.transform(X, y)
    N = len(ys)
    assert Xs.shape == (N, width, nvars)
    assert np.all([np.equal(X[i][0:width], Xs[i]) for i in range(len(X))])
    assert np.all([np.equal(y[i][0:width], ys[i]) for i in range(len(y))])

    # univariate ts data without context data
    X = [np.random.rand(Nt), np.random.rand(2 * Nt), np.random.rand(3 * Nt)]
    y = [np.random.rand(Nt), np.random.rand(2 * Nt), np.random.rand(3 * Nt)]
    seg.fit(X, y)
    Xs, ys, _ = seg.transform(X, y)
    N = len(ys)
    assert Xs.shape == (N, width)
    assert np.all([np.equal(X[i][0:width], Xs[i]) for i in range(len(X))])
    assert np.all([np.equal(y[i][0:width], ys[i]) for i in range(len(y))])

    # multivariate ts data with context data
    Xt = [np.random.rand(Nt, nvars), np.random.rand(2 * Nt, nvars), np.random.rand(Nt, nvars)]
    Xc = np.random.rand(3, 4)
    y = [np.random.rand(Nt), np.random.rand(2 * Nt), np.random.rand(Nt)]
    X = TS_Data(Xt, Xc)
    seg.fit(X, y)
    Xs, ys, _ = seg.transform(X, y)
    Xst, Xsc = get_ts_data_parts(Xs)
    N = len(ys)
    assert Xst.shape == (N, width, nvars)
    assert Xsc.shape == (N, 4)
    assert np.all([np.equal(Xt[i][0:width], Xst[i]) for i in range(len(Xt))])
    assert np.all([np.equal(Xc[i], Xsc[i]) for i in range(len(Xt))])
    assert np.all([np.equal(y[i][0:width], ys[i]) for i in range(len(y))])

    # ts data with univariate context data
    Xt = [np.random.rand(Nt, nvars), np.random.rand(2 * Nt, nvars), np.random.rand(Nt, nvars)]
    Xc = np.random.rand(3)
    y = [np.random.rand(Nt), np.random.rand(2 * Nt), np.random.rand(Nt)]
    X = TS_Data(Xt, Xc)
    seg.fit(X, y)
    Xs, ys, _ = seg.transform(X, y)
    Xst, Xsc = get_ts_data_parts(Xs)
    N = len(ys)
    assert Xst.shape == (N, width, nvars)
    assert Xsc.shape == (N,)
    assert np.all([np.equal(Xt[i][0:width], Xst[i]) for i in range(len(Xt))])
    assert np.all([np.equal(Xc[i], Xsc[i]) for i in range(len(Xt))])
    assert np.all([np.equal(y[i][0:width], ys[i]) for i in range(len(y))])

    # same number as context vars and time vars
    # this would cause broadcasting failure before implementation of TS_Data class
    Xt = [np.random.rand(Nt, nvars), np.random.rand(2 * Nt, nvars), np.random.rand(Nt, nvars)]
    Xc = np.random.rand(3, nvars)
    y = [np.random.rand(Nt), np.random.rand(2 * Nt), np.random.rand(Nt)]
    X = TS_Data(Xt, Xc)
    seg.fit(X, y)
    Xs, ys, _ = seg.transform(X, y)
    Xst, Xsc = get_ts_data_parts(Xs)
    N = len(ys)
    assert Xst.shape == (N, width, nvars)
    assert Xsc.shape == (N, 5)
    assert np.all([np.equal(Xt[i][0:width], Xst[i]) for i in range(len(Xt))])
    assert np.all([np.equal(Xc[i], Xsc[i]) for i in range(len(Xt))])
    assert np.all([np.equal(y[i][0:width], ys[i]) for i in range(len(y))])

    width = 5
    nvars = 5
    seg = transform.PadTrunc(width=width)

    # multivariate ts data without context data
    X = [np.random.rand(100, nvars), np.random.rand(100, nvars), np.random.rand(100, nvars)]
    y = np.random.rand(3)
    seg.fit(X, y)
    Xs, ys, _ = seg.transform(X, y)
    N = len(ys)
    assert Xs.shape == (N, width, nvars)
    assert np.all([np.equal(X[i][0:width], Xs[i]) for i in range(len(Xt))])
    assert np.all([np.equal(y[i], ys[i]) for i in range(len(y))])

    # univariate ts data without context
    X = [np.random.rand(100), np.random.rand(100), np.random.rand(100)]
    y = np.random.rand(3)
    seg.fit(X, y)
    Xs, ys, _ = seg.transform(X, y)
    N = len(ys)
    assert Xs.shape == (N, width)
    assert np.all([np.equal(X[i][0:width], Xs[i]) for i in range(len(Xt))])
    assert np.all([np.equal(y[i], ys[i]) for i in range(len(y))])

    # multivariate ts data with context data
    Xt = [np.random.rand(100, nvars), np.random.rand(200, nvars), np.random.rand(50, nvars)]
    Xc = np.random.rand(3, 4)
    y = np.random.rand(3)
    X = TS_Data(Xt, Xc)
    seg.fit(X, y)
    Xs, ys, _ = seg.transform(X, y)
    Xst, Xsc = get_ts_data_parts(Xs)
    N = len(ys)
    assert Xst.shape == (N, width, nvars)
    assert Xsc.shape == (N, 4)
    assert np.all([np.equal(Xt[i][0:width], Xst[i]) for i in range(len(Xt))])
    assert np.all([np.equal(Xc[i], Xsc[i]) for i in range(len(Xt))])
    assert np.all([np.equal(y[i], ys[i]) for i in range(len(y))])


def test_interp():
    # univariate time series
    N = 100
    t = np.arange(N) + np.random.rand(N)
    X = [np.column_stack([t, np.random.rand(N)])]
    y = [np.random.rand(N)]

    interp = transform.Interp(2)
    interp.fit(X)
    Xc, yc, swt = interp.transform(X, y)

    assert len(Xc[0]) == N / 2
    assert len(yc[0]) == N / 2
    assert np.ndim(Xc[0]) == 1

    y = [np.random.randint(0, 5, N)]
    interp = transform.Interp(5, kind='cubic', categorical_target=True)
    interp.fit(X, y)
    Xc, yc, swt = interp.transform(X, y)

    assert len(Xc[0]) == N / 5
    assert len(yc[0]) == N / 5
    assert np.ndim(Xc[0]) == 1
    assert np.all(np.isin(yc, np.arange(6)))

    # multivariate time series
    N = 100
    D = 5
    t = np.arange(N) + np.random.rand(N)
    X = [np.column_stack([t, np.random.rand(N,D)])]
    y = [np.random.rand(N)]

    interp = transform.Interp(2)
    interp.fit(X)
    Xc, yc, swt = interp.transform(X, y)

    assert len(Xc[0]) == N / 2
    assert len(yc[0]) == N / 2
    assert Xc[0].shape[1] == D

    y = [np.random.randint(0, 5, N)]
    interp = transform.Interp(5, kind='cubic', categorical_target=True)
    interp.fit(X, y)
    Xc, yc, swt = interp.transform(X, y)

    assert len(Xc[0]) == N / 5
    assert len(yc[0]) == N / 5
    assert Xc[0].shape[1] == D
    assert np.all(np.isin(yc, np.arange(6)))

    # sorting case
    N = 100
    t = np.arange(N)
    t[0:3] = 0
    X = [np.column_stack([t, np.random.rand(N)])]
    y = [np.random.rand(N)]

    with warnings.catch_warnings(record=True) as w:
        warnings.simplefilter("always")
        interp = transform.Interp(sample_period=2, assume_sorted=False)
        interp.fit(X)
        Xc, yc, swt = interp.transform(X, y)
        assert len(w) == 2
        assert issubclass(w[-1].category, UserWarning)
        assert "duplicate" in str(w[-1].message)
        assert len(Xc[0]) == N / 2
        assert len(yc[0]) == N / 2
        assert np.ndim(Xc[0]) == 1
        assert np.count_nonzero(np.isnan(Xc)) == 0


def test_interp_long_to_wide():
    # Test 1
    t = np.array([1.1, 1.2, 2.1, 3.3, 3.4, 3.5]).astype(float)
    s = np.array([0, 1, 0, 0, 1, 1]).astype(float)
    v1 = np.array([3, 4, 5, 7, 15, 25]).astype(float)
    v2 = np.array([5, 7, 6, 9, 22, 35]).astype(float)
    y = np.array([1, 2, 2, 2, 3, 3]).astype(float)
    df = np.column_stack([t, s, v1, v2])

    X = [df, df]
    y = [y, y]
    
    stacked_interp = transform.InterpLongToWide(0.5)
    stacked_interp.fit(X, y)
    Xc, yc, swt = stacked_interp.transform(X, y)

    # --Checks--
    # linearly sampled time within bounds = 1.2, 1.7, 2.2, 2.7, 3.2 --> len(Xc[0]) = 5
    assert len(Xc[0]) == 5
    # Xc shape[1] = unique(s) * no. dimensions of values (V1) = 2 * 2 = 4
    assert Xc[0].shape[1] == 4
    assert swt is None

    # Test 2
    y = [1, 2]
    stacked_interp.fit(X, y)
    Xc, yc, swt = stacked_interp.transform(X, y)
    assert np.array_equal(yc, y)

    # Test 3
    N = 100
    sample_period = 0.5
    t = np.arange(N) + np.random.rand(N)
    s = np.array([1, 2] * int(N/2))
    np.random.shuffle(s)

    v1 = np.arange(N) + np.random.rand(N)
    v2 = np.arange(N) + np.random.rand(N)
    v3 = np.arange(N) + np.random.rand(N)
    df = np.column_stack([t, s, v1, v2, v3])
    X = [df, df, df]
    dm = np.arange(N) + np.random.rand(N)
    y = [dm, dm, dm]

    stacked_interp = transform.InterpLongToWide(sample_period)
    stacked_interp.fit(X, y)

    Xc, yc, swt = stacked_interp.transform(X, y)

    # --Checks--
    assert Xc[0].shape[1] == len(np.unique(s)) * (X[0].shape[1]-2)
    assert len(Xc[0]) <= N/sample_period

    # Test 3 - duplicate entries for t
    t = np.array([1.1, 1.1, 1.2, 2.1, 3.3, 3.4, 3.5]).astype(float)
    s = np.array([0, 0, 1, 0, 0, 1, 1]).astype(float)
    v1 = np.array([3, 3, 4, 5, 7, 15, 25]).astype(float)
    v2 = np.array([5, 5, 7, 6, 9, 22, 35]).astype(float)
    y = np.array([1, 1, 2, 2, 2, 3, 3]).astype(float)
    df = np.column_stack([t, s, v1, v2])

    X = [df, df]
    y = [y, y]

    with warnings.catch_warnings(record=True) as w:
        stacked_interp = transform.InterpLongToWide(0.5, assume_sorted=False)
        stacked_interp.fit(X, y)
        Xc, yc, swt = stacked_interp.transform(X, y)

        assert len(w) == 1
        assert issubclass(w[-1].category, UserWarning)
        assert "duplicate" in str(w[-1].message)

        # --Checks--
        assert len(Xc[0]) == 5
        assert Xc[0].shape[1] == 4
        assert swt is None
        assert np.count_nonzero(np.isnan(Xc)) == 0


def test_feature_rep_mix():
    union = transform.FeatureRepMix([
        ('a', transform.FeatureRep(features={'mean': mean}), 0),
        ('b', transform.FeatureRep(features={'mean': mean}), 1),
        ('c', transform.FeatureRep(features={'mean': mean}), [2,3]),
        ('d', transform.FeatureRep(features={'mean': mean}), slice(0,2)),
        ('e', transform.FeatureRep(features={'mean': mean}), [False, False, True, True]),
    ])

    # multivariate ts
    X = np.random.rand(100, 10, 4)
    y = np.ones(100)
    union.fit(X, y)
    Xt = union.transform(X)
    assert Xt.shape[0] == len(X)
    assert len(union.f_labels) == Xt.shape[1]

    # ts with multivariate contextual data
    X = TS_Data(np.random.rand(100, 10, 4), np.random.rand(100, 3))
    y = np.ones(100)
    union.fit(X, y)
    Xt = union.transform(X)
    assert Xt.shape[0] == len(X)
    assert len(union.f_labels) == Xt.shape[1]

    # ts with univariate contextual data
    X = TS_Data(np.random.rand(100, 10, 4), np.random.rand(100))
    y = np.ones(100)
    union.fit(X, y)
    Xt = union.transform(X)
    assert Xt.shape[0] == len(X)
    assert len(union.f_labels) == Xt.shape[1]

    # univariate ts
    uni_union = transform.FeatureRepMix([
        ('a', transform.FeatureRep(features={'mean': mean}), 0),
        ('b', transform.FeatureRep(features={'mean': mean}), [0]),
        ('c', transform.FeatureRep(features={'mean': mean}), slice(0,1)),
        ('d', transform.FeatureRep(features={'mean': mean}), [True]),
    ])
    X = np.random.rand(100, 10)
    y = np.ones(100)
    uni_union.fit(X, y)
    Xt = uni_union.transform(X)
    assert Xt.shape[0] == len(X)
    assert len(uni_union.f_labels) == Xt.shape[1]


def test_function_transform():
    constant = 10
    identity = transform.FunctionTransformer()
    def replace(Xt, value):
        return np.ones(Xt.shape) * value
    custom = transform.FunctionTransformer(replace, func_kwargs={"value": constant})

    # univariate ts
    X = np.random.rand(100, 10)
    y = np.ones(100)

    identity.fit(X, y)
    Xtrans = identity.transform(X)
    assert Xtrans is X

    custom.fit(X, y)
    Xtrans = custom.transform(X)
    assert np.array_equal(Xtrans, np.ones(X.shape) * constant)

    # multivariate ts
    X = np.random.rand(100, 10, 4)
    y = np.ones(100)

    identity.fit(X, y)
    Xtrans = identity.transform(X)
    assert Xtrans is X

    custom.fit(X, y)
    Xtrans = custom.transform(X)
    assert np.array_equal(Xtrans, np.ones(X.shape) * constant)

    # ts with univariate contextual data
    Xt = np.random.rand(100, 10, 4)
    Xc = np.random.rand(100)
    X = TS_Data(Xt, Xc)
    y = np.ones(100)

    identity.fit(X, y)
    Xtrans = identity.transform(X)
    assert Xtrans is X

    custom.fit(X, y)
    Xtrans = custom.transform(X)
    Xtt, Xtc = get_ts_data_parts(Xtrans)
    assert np.array_equal(Xtt, np.ones(Xt.shape) * constant)
    assert Xtc is Xc

    # ts with multivariate contextual data
    Xt = np.random.rand(100, 10, 4)
    Xc = np.random.rand(100, 3)
    X = TS_Data(Xt, Xc)
    y = np.ones(100)

    identity.fit(X, y)
    Xtrans = identity.transform(X)
    assert Xtrans is X

    custom.fit(X, y)
    Xtrans = custom.transform(X)
    Xtt, Xtc = get_ts_data_parts(Xtrans)
    assert np.array_equal(Xtt, np.ones(Xt.shape) * constant)
    assert Xtc is Xc

    # test resampling
    def resample(Xt):
        return Xt.reshape(1, -1)

    illegal_resampler = transform.FunctionTransformer(resample)
    X = np.random.rand(100, 10)
    y = np.ones(100)
    illegal_resampler.fit(X, y)
    with pytest.raises(ValueError):
        Xtrans = illegal_resampler.transform(X)

# MUST be defined in the global scope for pickling to work correctly
def mock_resample(ndarray):
    return ndarray[:len(ndarray) // 2]
class MockImblearnSampler(BaseEstimator):
    def __init__(self, mocked_param="mock"):
        pass
    @staticmethod
    def _check_X_y(X, y):
        return X, y, True
    def fit_resample(self, X, y):
        X, y, _ = self._check_X_y(X, y)
        return mock_resample(X), mock_resample(y)

def test_patch_sampler():
    # test patch_sampler on a class without a fit_resample function
    class EmptyClass(object):
        pass
    with pytest.raises(TypeError):
        transform.patch_sampler(EmptyClass)

    # test patch_sampler on a mocked imbalanced-learn Sampler class
    unpatched_sampler = MockImblearnSampler()
    patched_sampler = transform.patch_sampler(MockImblearnSampler)(shuffle=True, random_state=0)
    assert str(patched_sampler.__class__) != str(unpatched_sampler.__class__)
    pickled_sampler = pickle.dumps(patched_sampler)
    unpickled_sampler = pickle.loads(pickled_sampler)
    assert str(patched_sampler.__class__) == str(unpickled_sampler.__class__)

    # test representation
    assert "mocked_param" in repr(patched_sampler)
    assert "random_state" in repr(patched_sampler)
    assert "shuffle" in repr(patched_sampler)

    # multivariate ts
    X = np.random.rand(100, 10, 4)
    y = np.ones(100)
    Xt, yt, _ = patched_sampler.transform(X, y)
    assert Xt is X
    assert yt is y
    Xt, yt, _ = patched_sampler.fit_transform(X, y)
    X, y = shuffle(mock_resample(X), mock_resample(y), random_state=0)
    assert np.array_equal(Xt, X)
    assert np.array_equal(yt, y)

    # ts with multivariate contextual data
    X = TS_Data(np.random.rand(100, 10, 4), np.random.rand(100, 3))
    Xt_orig, _ = get_ts_data_parts(X)
    y = np.ones(100)
    Xt, yt, _ = patched_sampler.transform(X, y)
    assert Xt is X
    assert yt is y
    Xt, yt, _ = patched_sampler.fit_transform(X, y)
    Xtt, Xtc = get_ts_data_parts(Xt)
    Xt_orig, y = shuffle(mock_resample(Xt_orig), mock_resample(y), random_state=0)
    assert np.array_equal(Xtt, Xt_orig)
    assert np.array_equal(yt, y)

    # ts with univariate contextual data
    X = TS_Data(np.random.rand(100, 10, 4), np.random.rand(100))
    Xt_orig, _ = get_ts_data_parts(X)
    y = np.ones(100)
    Xt, yt, _ = patched_sampler.transform(X, y)
    assert Xt is X
    assert yt is y
    Xt, yt, _ = patched_sampler.fit_transform(X, y)
    Xtt, Xtc = get_ts_data_parts(Xt)
    Xt_orig, y = shuffle(mock_resample(Xt_orig), mock_resample(y), random_state=0)
    assert np.array_equal(Xtt, Xt_orig)
    assert np.array_equal(yt, y)

    # univariate ts
    X = np.random.rand(100, 10)
    y = np.ones(100)
    Xt, yt, _ = patched_sampler.transform(X, y)
    assert Xt is X
    assert yt is y
    Xt, yt, _ = patched_sampler.fit_transform(X, y)
    X, y = shuffle(mock_resample(X), mock_resample(y), random_state=0)
    assert np.array_equal(Xt, X)
    assert np.array_equal(yt, y)
    
test_patch_sampler()
test_function_transform()
test_feature_rep_mix()
#test_interp_long_to_wide()
test_interp()
test_pad_trunc()
test_segmentxyforecast()
test_segmentxy()
test_segmentx()
test_sliding_tensor()
test_sliding_window()

#### Error in one of the tests

In [48]:
test_interp_long_to_wide()

AssertionError: 

### Test util

In [50]:
# Author: David Burns
# License: BSD

import numpy as np

from seglearn.datasets import load_watch
from seglearn.base import TS_Data
from seglearn import util


def test_util():
    df = load_watch()

    data = TS_Data(df['X'], df['side'])
    Xt, Xc = util.get_ts_data_parts(data)

    assert np.array_equal(Xc, df['side'])
    assert np.all([np.array_equal(Xt[i], df['X'][i]) for i in range(len(df['X']))])

    util.check_ts_data(data, df['y'])
    util.check_ts_data(df['X'], df['y'])

    util.ts_stats(df['X'], df['y'], fs=1., class_labels=df['y_labels'])


def test_to_categorical_series():
    p = np.arange(10)
    step = 2
    width = 3
    s = util.segmented_prediction_to_series(p, step, width, categorical_target=True)
    assert len(s) == (len(p) - 1) * step + width
    assert np.all(np.isin(s, p))

    p = np.arange(10)
    step = 3
    width = 2
    s = util.segmented_prediction_to_series(p, step, width, categorical_target=True)
    assert len(s) == (len(p) - 1) * step + width
    assert np.all(np.isin(s, p))

    p = np.arange(10)
    p = np.column_stack([p, p])
    step = 2
    width = 3
    s = util.segmented_prediction_to_series(p, step, width, categorical_target=True)
    assert len(s) == (len(p) - 1) * step + width
    assert s.shape[1] == 2
    assert np.all(np.isin(s, p))

def test_to_real_series():
    p = np.arange(20)  # highly overlapping case
    step = 2
    width = 5
    s = util.segmented_prediction_to_series(p, step, width, categorical_target=False)
    assert len(s) == (len(p) - 1) * step + width
    assert np.all(s <= max(p))
    assert np.all(s >= min(p))

    p = np.arange(10)
    step = 3
    width = 2
    s = util.segmented_prediction_to_series(p, step, width, categorical_target=False)
    assert len(s) == (len(p) - 1) * step + width
    assert np.all(s <= max(p))
    assert np.all(s >= min(p))

    p = np.arange(5)
    p = np.column_stack([p, p])
    step = 2
    width = 5
    s = util.segmented_prediction_to_series(p, step, width, categorical_target=False)
    assert len(s) == (len(p) - 1) * step + width
    assert s.shape[1] == 2
    assert np.all(s <= np.max(p))
    assert np.all(s >= np.min(p))
    
    
test_util()
test_to_categorical_series()
test_to_real_series()

### Summary

As we can see, the article is preety much fully reproducible, except of the some example. One of them is connected with the other packages in python. One of the test is not passed.
