Example based on https://github.com/CybJaz/dyrect

In [5]:
from plyfile import PlyData
import numpy as np

def load_ply_model(filename):
    plydata = PlyData.read(filename)
    coords = np.array([list(x) for x in plydata.elements[0].data])
    return coords

lms = load_ply_model('lorenz_model_20.ply')
lms.shape

(348, 3)

In [9]:
def lorentz_eq(_, args):
    l_sigma = 10.
    l_beta = 8. / 3
    l_rho = 28.
    return [l_sigma * (args[1] - args[0]),
            args[0] * (l_rho - args[2]) - args[1],
            args[0] * args[1] - l_beta * args[2]]


def lorenz_attractor(npoints, step=0.02, adaptive_step=False, starting_point=None, skip=100):
    if starting_point is None:
        starting_point = [1., 1., 1.]
    points = np.empty((npoints, 3))
    # starting_point = [1., 1., 1.]
    integrator = RK45(lorentz_eq, 0, starting_point, 10000,
                      first_step=step, max_step=(4 * step if adaptive_step else step))

    # get closer to the attractor first
    for _ in range(skip):
        integrator.step()

    for i in range(npoints):
        points[i] = integrator.y
        if integrator.status == 'running':
            integrator.step()
        else:
            print('reloading integrator at ' + str(i))
            integrator = RK45(lorentz_eq, 0, points[i], 10000,
                              first_step=step, max_step=(4 * step if adaptive_step else step))
    return points

In [20]:
from dataclasses import dataclass

def symbolization(X, lms, eps=0, simplified=False):
    """ symbolization of a time series, a sequence of points is transformed into a list of integers (elements of
            the lms cover)
        X - a time series of a shape [n, dim]
        lms - landmarks - an array of shape [l, dim]
        eps - epsilon (from epsilon net)
        TODO: simplified - reduce repeating consecutive symbols
    """
    distances = cdist(X, lms, 'euclidean')  # distances to landmarks
    symbols = np.array([np.argmin(point_to_lms) for point_to_lms in distances])

    # check if points in X are covered
    if eps > 0:
        symbols = np.array([(l if distances[i, l] < eps else -1) for i, l in enumerate(symbols)])
    return symbols

def symb2string(symbols, codesize=-1):
    """
    Transforms a list (symbols) of positive integers into a string
    :param symbols: a list of integers
    :param codesize:
    :return:
    """
    m = np.max(symbols)
    if codesize < 1:
        codesize = len(str(m))
    nums = [''.join([str(0) for _ in range(codesize - len(str(x)))]) + str(x) for x in symbols]
    return '-'.join(nums)

@dataclass
class Future:
    sequence: tuple
    counter: int
    occurences: list

@dataclass
class Prediction:
    past: tuple
    futures: list

class Seer:
    """
    A class for making predictions from a time series based on an epsilon-net
    """

    def __init__(self, history, cover, eps=-1, simplified=False):
        """
        :param history: a time series used to create the database for predictions
        :param cover: a set of landmarks defining the set of symbols
        :param eps: if eps is -1 we are looking just for the closest landmark otherwise a distance less then epsilon is
            required, if distance is larger than eps we put extra symbol '-1'
        :param TODO: simplified - reduce repeating consecutive symbols
        """
        # cover variable could be more abstract, for now it's just a collection of landmarks
        self._history = history
        self._cover = cover
        self._eps = eps
        codes = symbolization(history, cover, eps)
        self._codesize = len(str(max(codes)))
        self._history_book = symb2string(codes)
        self._dimension = len(history[0])
        assert len(cover[0]) == self._dimension

        # it's state-machine like variables
        self._recent_reg = None
        self._recent_query = None
        self._recent_story = None
        self._recent_futures = None

        self._recent_prediction = None

        # TODO:
        # class Query:

    def predict(self, past, f, p=-1):
        dim = len(past[0])
        assert len(past[0]) == self._dimension

        # (p,f) - p-past steps, f-future steps predictions
        # TODO: p is not used
        if p == -1:
            p = len(past)
        else:
            p = min(len(past), p)

        self._recent_query = past[-p:]
        self._recent_story = symbolization(self._recent_query, self._cover, self._eps)
        if min(self._recent_story) < 0:
            # the negative symbol means an unknown symbol
            print("this past has never happened before")
            return None
        self._recent_reg = symb2string(self._recent_story, codesize=self._codesize) + '.{' + str(f * (self._codesize+1)) + '}'
        futures = [(event.group(0), event.span(0)) for event in re.finditer(self._recent_reg, self._history_book)]
        futures = [(tuple([int(k) for k in f[:].split('-')]), idxs) for (f, idxs) in futures]
        # self._recent_unique_futures = Counter([future[0][-f * (self._codesize+1):] for future in futures])
        futures_dict = dict()
        for f, idxs in futures:
            (b,e) = (int(idxs[0] / (self._codesize+1.)), int((idxs[1] + 1) / (self._codesize+1.)))
            if f in futures_dict:
                futures_dict[f].counter += 1
                futures_dict[f].occurences.append((b,e))
            else:
                futures_dict[f] = Future(f, 1, [(b,e)])

        self._recent_futures = [f for f in futures_dict.values()]
        self._recent_futures.sort(key=(lambda f: f.counter), reverse=True)

        self._recent_prediction = Prediction(self._recent_story, self._recent_futures)
        return self._recent_prediction
    
    def paths(self):
        paths = []
        for cpath in prediction.futures:
            paths = paths + [[self._cover[k] for k in cpath.sequence[(len(prediction.past)):]]]
        return np.array(paths)

In [8]:
from src import Complex
rips_lms = Complex(lms[:150], 5)
rips_lms.draw_complex()
rips_lms.count_simplexes()

(150, 394, 348)

In [33]:
from scipy.integrate import RK45
from scipy.spatial.distance import cdist
import re

train_points = lorenz_attractor(200000, step=0.01, adaptive_step=False, starting_point=[1.,1.,1.], skip=2000)
test_points = lorenz_attractor(5000, step=0.01, adaptive_step=False, starting_point=[-1.,-1.,1.], skip=2000)
eps = 4.
symbs = symbolization(test_points, lms, eps)
seer = Seer(train_points, lms, eps)

t0 = 1580
tpredict = 80
t1 = t0 + 5

query = test_points[t0:t1]
prediction = seer.predict(query, tpredict)

In [34]:
from src import HierarchicalClustering
hc = HierarchicalClustering(rips_lms)
print(hc.fit_predict(seer.paths()))
hc.draw_predict(seer.paths(), on_complex=True)

[2 2 3 2 1 3 2 3 3 3 3 2 2 3 2 3 2 2 2 2 2 4 2 2 3 4 4 2 2 2 3 2 3 2 2 1 3
 2 2 4 2 2 2 3 3 2 3 3 2 3 2 3 2 3 2 3 1 2 3 5 2 2 3 2 2]


In [35]:
hc.draw_predict(seer.paths(), on_complex=False)