-
Notifications
You must be signed in to change notification settings - Fork 1
Implementing basic RQA functionality #15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
35b226f
bdd7a9a
d636806
feb3fd0
fd33591
64f7648
5703ef1
c5a843d
7cdc74b
9662c2f
1abb720
97896f3
ba8011e
fae72a2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,6 +36,7 @@ classifiers = [ | |
| ] | ||
| dependencies = [ | ||
| "pandas", | ||
| "scipy", | ||
| "StrEnum; python_version < '3.11'", | ||
| ] | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -147,7 +147,11 @@ def _parse_metadata_row(self, key: str, values: list[t.Any]) -> None: | |
| The values of the metadata row. | ||
| """ | ||
| k, v = parse_metadata_row(key, values) | ||
| self._metadata[k] = v | ||
| if k not in self._metadata: | ||
| self._metadata[k] = v | ||
| else: | ||
| # Metadata entry for an existing key: append values to the list | ||
| self._metadata[k] += v | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice catch! |
||
|
|
||
| def _extract_metadata_from_file(self, path: Path) -> None: | ||
| """Extract the metadata from a file and return it as a dict. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,12 +2,13 @@ | |
|
|
||
| import numpy as np | ||
| import pandas as pd | ||
| import scipy | ||
|
|
||
| from mopipe.core.common.util import int_or_str_slice | ||
| from mopipe.core.segments.inputs import AnySeriesInput, MultivariateSeriesInput | ||
| from mopipe.core.segments.outputs import SingleNumericValueOutput, UnivariateSeriesOutput | ||
| from mopipe.core.segments.inputs import AnySeriesInput, MultivariateSeriesInput, UnivariateSeriesInput | ||
| from mopipe.core.segments.outputs import MultivariateSeriesOutput, SingleNumericValueOutput, UnivariateSeriesOutput | ||
| from mopipe.core.segments.seg import Segment | ||
| from mopipe.core.segments.segmenttypes import SummaryType | ||
| from mopipe.core.segments.segmenttypes import AnalysisType, SummaryType, TransformType | ||
|
|
||
|
|
||
| class Mean(SummaryType, AnySeriesInput, SingleNumericValueOutput, Segment): | ||
|
|
@@ -35,3 +36,141 @@ def process( | |
| return x.select_dtypes(include="number").mean() | ||
| msg = f"Invalid col type {type(col)} provided, Must be None, int, str, or a slice." | ||
| raise ValueError(msg) | ||
|
|
||
|
|
||
| class CalcShift(TransformType, MultivariateSeriesInput, MultivariateSeriesOutput, Segment): | ||
| def process( | ||
| self, x: pd.DataFrame, cols: t.Union[list[str], None] = None, shift: int = 1, **kwargs | ||
| ) -> pd.DataFrame: | ||
| if cols is None: | ||
| cols = x.columns | ||
| for col_name in cols: | ||
| col_data = x[col_name].values | ||
| new_col_name = col_name + "_shift" | ||
| new_col_data = np.concatenate((np.zeros(shift), | ||
| col_data[shift:] - col_data[:-shift])) | ||
| x[new_col_name] = new_col_data | ||
| return x | ||
|
|
||
|
|
||
| class SimpleGapFilling(TransformType, MultivariateSeriesInput, MultivariateSeriesOutput, Segment): | ||
| def process( | ||
| self, x: pd.DataFrame, **kwargs | ||
| ) -> pd.DataFrame: | ||
| return x.interpolate(method="linear") | ||
|
|
||
|
|
||
| def calc_rqa(x: np.array, y: np.array, dim: int = 1, tau: int = 1, threshold: float = 0.1, lmin: int = 2): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could maybe go into mopipe.core somewhere? |
||
| embed_data_x, embed_data_y = [], [] | ||
| for i in range(dim): | ||
| embed_data_x.append(x[i*tau:x.shape[0]-(dim-i-1)*tau]) | ||
| embed_data_y.append(y[i*tau:y.shape[0]-(dim-i-1)*tau]) | ||
| embed_data_x, embed_data_y = np.array(embed_data_x), np.array(embed_data_y) | ||
|
|
||
| distance_matrix = scipy.spatial.distance_matrix(embed_data_x.T, embed_data_y.T) | ||
| recurrence_matrix = distance_matrix < threshold | ||
| msize = recurrence_matrix.shape[0] | ||
|
|
||
| d_line_dist = np.zeros(msize+1) | ||
| for i in range(-msize+1, msize): | ||
| cline = 0 | ||
| for e in np.diagonal(recurrence_matrix, i): | ||
| if e: | ||
| cline += 1 | ||
| else: | ||
| d_line_dist[cline] += 1 | ||
| cline = 0 | ||
| d_line_dist[cline] += 1 | ||
|
|
||
| v_line_dist = np.zeros(msize+1) | ||
| for i in range(msize): | ||
| cline = 0 | ||
| for e in recurrence_matrix[:,i]: | ||
| if e: | ||
| cline += 1 | ||
| else: | ||
| v_line_dist[cline] += 1 | ||
| cline = 0 | ||
| v_line_dist[cline] += 1 | ||
|
|
||
| rr_sum = recurrence_matrix.sum() | ||
| rr = rr_sum / msize**2 | ||
| det = (d_line_dist[lmin:] * np.arange(msize+1)[lmin:]).sum() / rr_sum if rr_sum > 0 else 0 | ||
| lam = (v_line_dist[lmin:] * np.arange(msize+1)[lmin:]).sum() / rr_sum if rr_sum > 0 else 0 | ||
|
|
||
| d_sum = d_line_dist[lmin:].sum() | ||
| avg_diag_length = (d_line_dist[lmin:] * np.arange(msize+1)[lmin:]).sum() / d_sum if d_sum > 0 else 0 | ||
| v_sum = d_line_dist[lmin:].sum() | ||
| avg_vert_length = (v_line_dist[lmin:] * np.arange(msize+1)[lmin:]).sum() / v_sum if v_sum > 0 else 0 | ||
|
|
||
| d_line_dist[lmin:] > 0 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line can go |
||
| d_probs = d_line_dist[lmin:][d_line_dist[lmin:] > 0] | ||
| d_probs /= d_probs.sum() | ||
| d_entropy = -(d_probs * np.log(d_probs)).sum() | ||
|
|
||
| v_line_dist[lmin:] > 0 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also this one |
||
| v_probs = v_line_dist[lmin:][v_line_dist[lmin:] > 0] | ||
| v_probs /= v_probs.sum() | ||
| v_entropy = -(v_probs * np.log(v_probs)).sum() | ||
|
|
||
| return rr, det, lam, avg_diag_length, avg_vert_length, d_entropy, v_entropy | ||
|
|
||
|
|
||
| class RQAStats(AnalysisType, UnivariateSeriesInput, MultivariateSeriesOutput, Segment): | ||
| def process( | ||
| self, x: pd.Series, dim: int = 1, tau: int = 1, threshold: float = 0.1, lmin: int = 2, **kwargs | ||
|
zeyus marked this conversation as resolved.
|
||
| ) -> pd.DataFrame: | ||
| out = pd.DataFrame(columns=["recurrence_rate", "determinism", "laminarity", | ||
| "avg_diag_length", "avg_vert_length", "d_entropy", "v_entropy"]) | ||
| if x.empty: | ||
| return out | ||
|
|
||
| x = x.values | ||
| out.loc[len(out)] = calc_rqa(x, x, dim, tau, threshold, lmin) | ||
| return out | ||
|
|
||
|
|
||
| class CrossRQAStats(AnalysisType, MultivariateSeriesInput, MultivariateSeriesOutput, Segment): | ||
| def process( | ||
| self, x: pd.DataFrame, col_a: t.Union[str, int] = 0, col_b: t.Union[str, int] = 0, | ||
| dim: int = 1, tau: int = 1, threshold: float = 0.1, lmin: int = 2, **kwargs | ||
| ) -> pd.DataFrame: | ||
| out = pd.DataFrame(columns=["recurrence_rate", "determinism", "laminarity", | ||
| "avg_diag_length", "avg_vert_length", "d_entropy", "v_entropy"]) | ||
| if x.empty: | ||
| return out | ||
| if isinstance(col_a, int): | ||
| xa = x.iloc[:, col_a].values | ||
| if isinstance(col_a, str): | ||
| xa = x.loc[:, col_a].values | ||
| if isinstance(col_b, int): | ||
| xb = x.iloc[:, col_b].values | ||
| if isinstance(col_b, str): | ||
| xb = x.loc[:, col_b].values | ||
|
|
||
| out.loc[len(out)] = calc_rqa(xa, xb, dim, tau, threshold, lmin) | ||
| return out | ||
|
|
||
|
|
||
| class WindowedCrossRQAStats(AnalysisType, MultivariateSeriesInput, MultivariateSeriesOutput, Segment): | ||
| def process( | ||
| self, x: pd.DataFrame, col_a: t.Union[str, int] = 0, col_b: t.Union[str, int] = 0, | ||
| dim: int = 1, tau: int = 1, threshold: float = 0.1, lmin: int = 2, window: int = 100, | ||
| step: int = 10, **kwargs | ||
| ) -> pd.DataFrame: | ||
| out = pd.DataFrame(columns=["recurrence_rate", "determinism", "laminarity", | ||
| "avg_diag_length", "avg_vert_length", "d_entropy", "v_entropy"]) | ||
| if x.empty: | ||
| return out | ||
| if isinstance(col_a, int): | ||
| xa = x.iloc[:, col_a].values | ||
| if isinstance(col_a, str): | ||
| xa = x.loc[:, col_a].values | ||
| if isinstance(col_b, int): | ||
| xb = x.iloc[:, col_b].values | ||
| if isinstance(col_b, str): | ||
| xb = x.loc[:, col_b].values | ||
|
|
||
| for w in range(0, xa.shape[0]-window+1, step): | ||
| out.loc[len(out)] = calc_rqa(xa[w:w+window], xb[w:w+window], dim, tau, threshold, lmin) | ||
| return out | ||
Uh oh!
There was an error while loading. Please reload this page.