Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ authors = [
readme = "README.md"
requires-python = ">=3.8"
dependencies = [
"numpy"
"numpy",
"dask"
]
classifiers = [
"Programming Language :: Python :: 3",
Expand All @@ -35,3 +36,8 @@ include-package-data = true

[tool.setuptools_scm]
version_scheme = "post-release"

[dependency-groups]
dev = [
"pytest>=8.3.5",
]
61 changes: 61 additions & 0 deletions rqadeforestation/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import ctypes as ct
import numpy as np
import dask.array as da

class MallocVector(ct.Structure):
_fields_ = [("pointer", ct.c_void_p),
Expand Down Expand Up @@ -43,6 +44,66 @@ def rqatrend(y: np.ndarray, threshold: float, border: int = 10, theiler: int = 1
result_single = lib.rqatrend(py, threshold, border, theiler)
return result_single


def rqatrend_matrix(matrix: np.ndarray, threshold: float, border: int = 10, theiler: int = 1) -> np.ndarray:
"""
Calculate the RQA trend for a matrix of time series.

:param matrix: Input time series data as a numpy array of shape (n_timeseries, series_length).
:param threshold: Threshold value for the RQA calculation.
:param border: Border size for the RQA calculation.
:param theiler: Theiler window size for the RQA calculation.
:return: Numpy array of all RQA trend values of size n_timeseries.
"""

n = matrix.shape[0]
result_several = np.ones(n)
p_result_several = mvptr(result_several)
p_matrix = mmptr(matrix)

# arguments: result_vector, data, threshhold, border, theiler
lib.rqatrend_inplace.argtypes = (ct.POINTER(MallocVector), ct.POINTER(MallocMatrix), ct.c_double, ct.c_int64, ct.c_int64)
return_value = lib.rqatrend_inplace(p_result_several, p_matrix, threshold, border, theiler)
return result_several


def rqatrend_dask(x: da.Array, timeseries_axis: int, threshold: float, border: int = 10, theiler: int = 1, out_dtype: type = np.float64) -> da.Array:
"""
Apply rqatrend to a given dask array.

Consider comparing this function's performance with a simple `dask.array.apply_along_axis`
```py
import dask.array as da
da.apply_along_axis(lambda ts: rqatrend(ts.ravel(), threshold, border, theiler), time_axis, darr)
```

:param x: dask Array on which rqatrend should be computed.
:param timeseries_axis: dask Array axis on which the rqatrend function should be applied
:param threshold: Threshold value for the RQA calculation.
:param border: Border size for the RQA calculation.
:param theiler: Theiler window size for the RQA calculation.
:param out_dtype: dtype of the output dask array, in case a smaller float representation is wanted, or similar.
:return: Dask array of all RQA trend values without the timeseries_axis dimension (it got aggregated by rqatrend).
"""
# Rechunk so full timeseries axis is in one block
# do this first so we can use the optimized chunks also for moveaxis
x_rechunked = x.rechunk({timeseries_axis: -1})

# Move timeseries axis to the end
x_moved = da.moveaxis(x_rechunked, timeseries_axis, -1)

def _block_wrapper(block):
# block shape: (..., series_length)
mat = block.reshape(-1, block.shape[-1]) # (n_timeseries, series_length)
result = rqatrend_matrix(mat, threshold, border, theiler) # (n_timeseries,)
return result.reshape(block.shape[:-1]) # reduce last axis

return x_moved.map_blocks(
_block_wrapper,
dtype=out_dtype,
drop_axis=-1 # <---- tell Dask we removed the last axis
)

def main() -> None:
pass

Expand Down
66 changes: 66 additions & 0 deletions tests/test_rqadeforestation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import numpy as np
from rqadeforestation import rqatrend, rqatrend_matrix, rqatrend_dask
import time
import dask.array as da

def test_rqatrend_versus_rqatrendmatrix() -> None:
x = np.arange(1, 30, step=0.01)
y = np.sin(x) + 0.1 * x
assert np.all(rqatrend(y, 0.5, 10, 1) == rqatrend_matrix(np.tile(y, (5, 1)), 0.5, 10, 1))


def _vector_method(ts: np.ndarray, threshold: float) -> float:
return rqatrend(ts.ravel(), threshold, 10, 1) # scalar


def vector_method(darr, time_axis, threshold):
return da.apply_along_axis(_vector_method, time_axis, darr, threshold=threshold)


def matrix_method(darr, time_axis, threshold):
return rqatrend_dask(darr, timeseries_axis=time_axis, threshold=threshold)


def test_dask_rqatrend_versus_rqatrendmatrix() -> None:
# ----- Synthetic multidimensional data -----
# Shape: (batch, channels, time)
batch = 5000
channels = 4

# Base timeseries
x = np.arange(1, 30, step=0.01)
y = np.sin(x) + 0.1 * x # shape (2900,)

# Create data, broadcast on first dimension
time_axis = 0
big_data = np.tile(y[:, None, None], (1, batch, channels))

# Create data, broadcast on last dimension
# time_axis = -1
# big_data = np.tile(y[None, None, :], (batch, channels, 1))

# Wrap in Dask
# darr = da.from_array(big_data, chunks=(200, channels, -1))
darr = da.from_array(big_data, chunks=400)

print("Original shape:", darr.shape)
print("Original chunks:", darr.chunks)

y1 = vector_method(darr, time_axis, threshold=0.5)
y2 = matrix_method(darr, time_axis, threshold=0.5)

print("Computing vector method...")
t0 = time.time()
res1 = y1.compute()
t1 = time.time()
print("Slow method time: %.2f s" % (t1 - t0,))

print("Computing matrix method...")
t0 = time.time()
res2 = y2.compute()
t1 = time.time()
print("Fast method time: %.2f s" % (t1 - t0,))

# Sanity check
print("Output shapes:", res1.shape, res2.shape)
assert np.allclose(res1, res2)