---
title: ID properties
subtitle: Full feature extraction
---

In [None]:
#| default_exp core/propeties
#| export
#| code-summary: "Import all the packages needed for the project"
import polars as pl
import xarray as xr
from fastcore.all import patch

try:
    import modin.pandas as pd
    import modin.pandas as mpd
    from modin.config import ProgressBar
    ProgressBar.enable()
except ImportError:
    import pandas as pd
import pandas
    
import numpy as np

from datetime import timedelta

from loguru import logger

import pdpipe as pdp
from pdpipe.util import out_of_place_col_insert

In [None]:
# | export
def get_candidate_data(
    candidate: dict, data: xr.DataArray, neighbor: int = 0
) -> xr.DataArray:
    duration = candidate["tstop"] - candidate["tstart"]
    offset = neighbor * duration
    temp_tstart = candidate["tstart"] - offset
    temp_tstop = candidate["tstop"] + offset

    return data.sel(time=slice(temp_tstart, temp_tstop))


def get_candidates(candidates: pd.DataFrame, candidate_type=None, num: int = 4):
    if candidate_type is not None:
        _candidates = candidates[candidates["type"] == candidate_type]
    else:
        _candidates = candidates

    # Sample a specific number of candidates if num is provided and it's less than the total number
    if num < len(_candidates):
        logger.info(
            f"Sampling {num} {candidate_type} candidates out of {len(_candidates)}"
        )
        return _candidates.sample(num)
    else:
        return _candidates

## Duration

In [None]:
#| export
from discontinuitypy.propeties.duration import calc_duration

In [None]:
# | export
def calc_candidate_duration(candidate, data, **kwargs):
    try:
        candidate_data = get_candidate_data(candidate, data)
        result = calc_duration(candidate_data, **kwargs)
        return pandas.Series(result)
    except Exception as e:
        logger.debug(
            f"Error for candidate {candidate} at {candidate['time']}: {str(e)}"
        )
        raise e

## Minimum variance analysis (MVA) features

In [None]:
#| export
from discontinuitypy.propeties.mva import calc_candidate_mva_features

## Field rotation angles
The PDF of the field rotation angles across the solar-wind IDs is well fitted by the exponential function exp(−θ/)...

In [None]:
#| export
def get_data_at_times(data: xr.DataArray, times) -> np.ndarray:
    """
    Select data at specified times.
    """
    # Use xarray's selection capability if data supports it
    return data.sel(time=times, method="nearest").to_numpy()

In [None]:
#| export
def calc_rotation_angle(v1, v2):
    """
    Computes the rotation angle between two vectors.
    
    Parameters:
    - v1: The first vector(s).
    - v2: The second vector(s).
    """
    
    if v1.shape != v2.shape:
        raise ValueError("Vectors must have the same shape.")
    
    # Normalize the vectors
    v1_u = v1 / np.linalg.norm(v1, axis=-1, keepdims=True)
    v2_u = v2 / np.linalg.norm(v2, axis=-1, keepdims=True)
    
    # Calculate the cosine of the angle for each time step
    cosine_angle = np.sum(v1_u * v2_u, axis=-1)
    
    # Clip the values to handle potential floating point errors
    cosine_angle = np.clip(cosine_angle, -1, 1)
    
    angle = np.arccos(cosine_angle)
    
    # Convert the angles from radians to degrees
    return np.degrees(angle)


In [None]:
#| export
def calc_events_rotation_angle(events, data: xr.DataArray):
    """
    Computes the rotation angle(s) at two different time steps.
    """
    tstart = events['t.d_start'].to_numpy()
    tstop = events['t.d_end'].to_numpy()

    vecs_before = get_data_at_times(data, tstart)
    vecs_after = get_data_at_times(data, tstop)

    rotation_angles = calc_rotation_angle(vecs_before, vecs_after)
    return rotation_angles

## Normal direction

In [None]:
#| export
def calc_normal_direction(v1, v2, normalize=True) -> np.ndarray:
    """
    Computes the normal direction of two vectors.

    Parameters
    ----------
    v1 : array_like 
        The first vector(s).
    v2 : array_like 
        The second vector(s).
    """
    c = np.cross(v1, v2)
    return c / np.linalg.norm(c, axis=-1, keepdims=True)


In [None]:
# | export
def calc_events_normal_direction(events, data: xr.DataArray):
    """
    Computes the normal directions(s) at two different time steps.
    """
    tstart = events['t.d_start'].to_numpy()
    tstop = events['t.d_end'].to_numpy()

    vecs_before = get_data_at_times(data, tstart)
    vecs_after = get_data_at_times(data, tstop)

    normal_directions = calc_normal_direction(vecs_before, vecs_after)
    # need to convert to list first, as only 1D array is supported
    return normal_directions.tolist()


In [None]:
# | export
def calc_events_vec_change(events, data: xr.DataArray):
    """
    Utils function to calculate features related to the change of the magnetic field
    """
    tstart = events['t.d_start'].to_numpy()
    tstop = events['t.d_end'].to_numpy()
    
    vecs_before = get_data_at_times(data, tstart)
    vecs_after = get_data_at_times(data, tstop)
    return (vecs_after - vecs_before).tolist()
    

## Pipelines

patch `pdp.ApplyToRows` to work with `modin` and `xorbits` DataFrames

In [None]:
#| export
@patch
def _transform(self: pdp.ApplyToRows, X, verbose):
    new_cols = X.apply(self._func, axis=1)
    if isinstance(new_cols, (pd.Series, pandas.Series)):
        loc = len(X.columns)
        if self._follow_column:
            loc = X.columns.get_loc(self._follow_column) + 1
        return out_of_place_col_insert(
            X=X, series=new_cols, loc=loc, column_name=self._colname
        )
    if isinstance(new_cols, (mpd.DataFrame, pandas.DataFrame)):
        sorted_cols = sorted(list(new_cols.columns))
        new_cols = new_cols[sorted_cols]
        if self._follow_column:
            inter_X = X
            loc = X.columns.get_loc(self._follow_column) + 1
            for colname in new_cols.columns:
                inter_X = out_of_place_col_insert(
                    X=inter_X,
                    series=new_cols[colname],
                    loc=loc,
                    column_name=colname,
                )
                loc += 1
            return inter_X
        assign_map = {
            colname: new_cols[colname] for colname in new_cols.columns
        }
        return X.assign(**assign_map)
    raise TypeError(  # pragma: no cover
        "Unexpected type generated by applying a function to a DataFrame."
        " Only Series and DataFrame are allowed."
    )

`Pipelines` Class for processing IDs

Notes: Using `lambda` function instead of `partial` because of `partial` freezeing the args decreasing the performance

In [None]:
# | export
class IDsPdPipeline:
    @staticmethod
    def calc_duration(data: xr.DataArray, **kwargs):
        return pdp.ApplyToRows(
            lambda df: calc_candidate_duration(df, data, **kwargs),
            func_desc="calculating pre-duration parameters",
        )

    @staticmethod
    def calc_mva_features(data, **kwargs):
        return pdp.ApplyToRows(
            lambda df: calc_candidate_mva_features(df, data, **kwargs),
            func_desc="calculating MVA features",
        )

    @staticmethod
    def calc_vec_change(data, **kwargs):
        return pdp.ColByFrameFunc(
            "dB",
            lambda df: calc_events_vec_change(df, data, **kwargs),
            func_desc="calculating compound change",
        )

    @staticmethod
    def calc_rotation_angle(data, **kwargs):
        return pdp.ColByFrameFunc(
            "rotation_angle",
            lambda df: calc_events_rotation_angle(df, data, **kwargs),
            func_desc="calculating rotation angle",
        )

    @staticmethod
    def calc_normal_direction(data, name="normal_direction", **kwargs):
        return pdp.ColByFrameFunc(
            name,
            lambda df: calc_events_normal_direction(df, data, **kwargs),
            func_desc="calculating normal direction",
        )

In [None]:
#| export
from beforerr.polars import convert_to_pd_dataframe, decompose_vector

In [None]:
# | export
from typing import Literal


def process_events(
    candidates_pl: pl.DataFrame,  # potential candidates DataFrame
    sat_fgm: xr.DataArray,  # satellite FGM data
    data_resolution: timedelta,  # time resolution of the data
    modin=True,
    method: Literal["fit", "derivative"]= "fit",
    **kwargs,
) -> pl.DataFrame:
    "Process candidates DataFrame"

    candidates = pd.DataFrame(convert_to_pd_dataframe(candidates_pl, modin=modin))
    
    if method == "fit":
        duration_method = "distance"  
    else:
        duration_method = "derivative"

    candidates = (
        IDsPdPipeline.calc_duration(sat_fgm, method=duration_method, **kwargs)
        .apply(candidates)
        .dropna()
    )  # Remove candidates with NaN values)

    ids = (
        IDsPdPipeline.calc_mva_features(sat_fgm, method=method, **kwargs)
        + IDsPdPipeline.calc_vec_change(sat_fgm)
        + IDsPdPipeline.calc_rotation_angle(sat_fgm)
        + IDsPdPipeline.calc_normal_direction(sat_fgm, name="k")
    ).apply(candidates)

    if isinstance(ids, mpd.DataFrame):
        ids = ids._to_pandas()

    vectors2decompose = ["dB", "dB_lmn", "k", "Vl", "Vn"]

    df = pl.DataFrame(
        ids.dropna(), schema_overrides={vec: pl.List for vec in vectors2decompose}
    )  # ArrowInvalid: Could not convert [0.9799027968348948, -0.17761542644940076, -0.07309766783111293] with type list: tried to convert to double

    if method == "fit":
        duration_expr = pl.col("fit.vars.sigma") * 2
    else:
        duration_expr = (
            pl.col("t.d_end") - pl.col("t.d_start")
        ).dt.total_nanoseconds() / 1e9  # convert to seconds

    for vec in vectors2decompose:
        df = decompose_vector(df, vec)

    return df.with_columns(duration=duration_expr).drop(vectors2decompose)
    # ValueError: Data type fixed_size_list[pyarrow] not supported by interchange protocol

In [None]:
#| hide
from nbdev import nbdev_export
nbdev_export()

## Test

### Test parallelization


Generally `mapply` and `modin` are the fastest. `xorbits` is expected to be the fastest but it is not and it is the slowest one.

```python
#| notest
sat = 'jno'
coord = 'se'
cols = ["BX", "BY", "BZ"]
tau = timedelta(seconds=60)
data_resolution = timedelta(seconds=1)

if True:
    year = 2012
    files = f'../data/{sat}_data_{year}.parquet'
    output = f'../data/{sat}_candidates_{year}_tau_{tau.seconds}.parquet'

    data = pl.scan_parquet(files).set_sorted('time').collect()

    indices = compute_indices(data, tau)
    # filter condition
    sparse_num = tau / data_resolution // 3
    filter_condition = filter_indices(sparse_num = sparse_num)

    candidates = indices.filter(filter_condition).with_columns(pl_format_time(tau)).sort('time')
    
    data_c = compress_data_by_events(data, candidates, tau)
    sat_fgm = df2ts(data_c, cols, attrs={"units": "nT"})
```

In [None]:
#| notest
candidates_pd = candidates.to_pandas()
candidates_modin = mpd.DataFrame(candidates_pd)
# candidates_x = xpd.DataFrame(candidates_pd)

In [None]:
#| code-summary: Test different libraries to parallelize the computation
#| notest
if True:
    pdp_test = pdp.ApplyToRows(
        lambda candidate: calc_candidate_duration(candidate, sat_fgm),  # fast a little bit
        # lambda candidate: calc_duration(get_candidate_data_xr(candidate, sat_fgm)),
        # lambda candidate: calc_duration(sat_fgm.sel(time=slice(candidate['tstart'], candidate['tstop']))),
        func_desc="calculating duration parameters",
    )
    
    # process_events(candidates_modin, sat_fgm, sat_state, data_resolution)
    
    # ---
    # successful cases
    # ---
    # candidates_pd.mapply(lambda candidate: calc_candidate_duration(candidate, sat_fgm), axis=1) # this works, 4.2 secs
    # candidates_pd.mapply(calc_candidate_duration, axis=1, data=sat_fgm) # this works, but a little bit slower, 6.7 secs
    
    # candidates_pd.apply(calc_candidate_duration, axis=1, data=sat_fgm) # Standard case: 24+s secs
    # candidates_pd.swifter.apply(calc_candidate_duration, axis=1, data=sat_fgm) # this works with dask, 80 secs
    # candidates_pd.swifter.set_dask_scheduler(scheduler="threads").apply(calc_candidate_duration, axis=1, data=sat_fgm) # this works with dask, 60 secs
    # candidates_modin.apply(lambda candidate: calc_candidate_duration(candidate, sat_fgm), axis=1) # this works with ray, 6 secs # NOTE: can not work with dask
    # candidates_x.apply(calc_candidate_duration, axis=1, data=sat_fgm) # 30 seconds
    # pdp_test(candidates_modin) # this works, 8 secs
    
    # ---
    # failed cases
    # ---
    # candidates_modin.apply(calc_candidate_duration, axis=1, data=sat_fgm) # AttributeError: 'DataFrame' object has no attribute 'sel'

In [None]:
import timeit
from functools import partial


In [None]:
def benchmark(task_dict, number=1):
    results = {}
    for name, (data, task) in task_dict.items():
        try:
            time_taken = timeit.timeit(
                lambda: task(data),
                number=number
            )
            results[name] = time_taken / number
        except Exception as e:
            results[name] = str(e)
    return results

In [None]:
#| notest

def benchmark_results(results, sat_fgm):
    func = partial(calc_candidate_duration, data=sat_fgm)
    task_dict = {
        'pandas': (candidates_pd, lambda _: _.apply(func, axis=1)),
        'pandas-mapply': (candidates_pd, lambda _: _.mapply(func, axis=1)),
        'modin': (candidates_modin, lambda _: _.apply(func, axis=1)),
        # 'xorbits': (candidates_x, lambda _: _.apply(func, axis=1)),
    }

    results = benchmark(task_dict)
    return results