In [75]:
import contextlib
import logging
import math
from datetime import datetime, timedelta, timezone
from typing import Any

import pandas as pd
import plotly.graph_objects as go
import plotly.io as pio
from pydantic import BaseModel, ValidationError, root_validator, validator

#rom hetdesrun.utils import plotly_fig_to_json_dict

In [5]:
class ComponentInputValidationException(Exception):
    """In code input validation failures"""

    def __init__(
        self,
        *args: Any,
        invalid_component_inputs: list[str],
        error_code: int | str = "",
        **kwargs: Any
    ):
        raise ValueError("Hier könnte ihr Error stehen.")

In [7]:
class GapDetectionParameters(BaseModel):
    start_date: str  # pydantic kann auch direkt datetime, muss aber getestet werden
    end_date: str
    auto_stepsize: bool = True
    history_end_date: str = None
    step_size_str: str = None
    percentil: float
    min_amount_datapoints: int
    add_gapsize_column: bool = True

    @validator(
        "start_date", "end_date"
    )  # TODO was ist mit dem Fall Attribut der Zeitreihe?
    def verify_date_strings(cls, date) -> datetime:
        date = datetime.fromisoformat(date).replace(tzinfo=timezone.utc)
        return date

    @validator("end_date")
    def verify_dates(cls, end_date, values: dict):
        start_date = values["start_date"]
        if start_date > end_date:
            raise ComponentInputValidationException(
                "The value start_date must not be later than the end_date, while it is "
                f"{start_date} > {end_date}.",
                error_code=422,
                invalid_component_inputs=["end_date_str", "start_date_str"],
            )
        return values

    @validator("history_end_date")
    def verify_history_end_date(cls, history_end_date, values: dict) -> datetime | None:
        start_date = values["start_date"]
        end_date = values["end_date"]
        if history_end_date is not None:
            try:
                history_end_date = datetime.fromisoformat(history_end_date).replace(
                    tzinfo=timezone.utc
                )
            except ValueError as err:
                raise ComponentInputValidationException(
                    "The date in history_end_date has to be formatted in iso format to allow "
                    "conversion to datetime type for gap detection.",
                    error_code=422,
                    invalid_component_inputs=["history_end_date_str"],
                ) from err

            if start_date > history_end_date:
                raise ComponentInputValidationException(
                    "The value history_end_date has to be inbetween start_date and end_date, while "
                    f"it is {history_end_date} < {start_date}.",
                    error_code=422,
                    invalid_component_inputs=["history_end_date_str"],
                )
            if end_date < history_end_date:
                raise ComponentInputValidationException(
                    "The value history_end_date has to be inbetween start_date and end_date, while "
                    f"it is {history_end_date} > {end_date}.",
                    error_code=422,
                    invalid_component_inputs=["history_end_date_str"],
                )
        else:
            history_end_date = None
        return history_end_date

    @validator("step_size_str")  # TODO auf freq string überprüfen
    def verify_step_size(cls, step_size, values: dict) -> str:
        auto_stepsize = values["auto_stepsize"]
        if (auto_stepsize is False) and (step_size is None):
            raise ComponentInputValidationException(
                "A step_size is required for gap detection, if it is not automatically determined.",
                error_code=422,
                invalid_component_inputs=["step_size_str"],
            )
        return step_size

    @validator("percentil")
    def verify_percentile(cls, percentil) -> int:
        if (percentil < 0) or (percentil > 100):
            raise ComponentInputValidationException(
                "The percentil value has to be a non-negative integer less or equal to 100.",
                error_code=422,
                invalid_component_inputs=["percentil"],
            )
        return percentil

    @validator("min_amount_datapoints")
    def verify__min_amount_datapoints(cls, min_amount) -> int:
        if min_amount < 0:
            raise ComponentInputValidationException(
                "The minimum amount of datapoints has to be a non-negative integer.",
                error_code=422,
                invalid_component_inputs=["min_amount_datapoints"],
            )
        return min_amount

In [46]:
def constrict_series_to_dates(
    timeseries_data: pd.Series | pd.DataFrame, start_date: datetime, end_date: datetime
) -> pd.Series | pd.DataFrame:
    return timeseries_data[
        (timeseries_data.index >= start_date) & (timeseries_data.index <= end_date)
    ]

In [47]:
def check_amount_datapoints(series:pd.Series, min_amount_datapoints:int):
    if len(series) < min_amount_datapoints:
            raise ComponentInputValidationException(
                f"The timeseries must contain at least {min_amount_datapoints} datapoints.",
                error_code=422,
                invalid_component_inputs=["timeseries"],
            )

In [66]:
def determine_timestep_gapsize_percentile(
    timeseries_data: pd.Series | pd.DataFrame, percentil=95
) -> pd.Timedelta:
    gaps = timeseries_data.index.to_series().diff().dropna()

    percentile_gapsize = gaps.quantile(
        percentil / 100, interpolation="nearest"
    )  #  TODO nachfragen wegen Interpol

    return percentile_gapsize

In [67]:
def freqstr2dateoffset(freqstr: str) -> pd.DateOffset:
    """Transform frequency string to Pandas DateOffset."""
    return pd.tseries.frequencies.to_offset(freqstr)

def freqstr2timedelta(freqstr: str) -> pd.Timedelta:
    """Transform frequency string to Pandas Timedelta."""
    try:
        return pd.to_timedelta(freqstr)
    except ValueError:
        return pd.to_timedelta(freqstr2dateoffset(freqstr))

In [None]:
def determine_gap_length(
    timeseries: pd.Series, stepsize=timedelta(minutes=1)
) -> pd.DataFrame:
    gaps = timeseries.index.to_series().diff().to_numpy()

    stepsize_seconds = stepsize.total_seconds()

    normalized_gaps = [
        pd.Timedelta(gap).total_seconds() / stepsize_seconds if pd.notnna(gap) else None
        for gap in gaps
    ]

    result_df = pd.DataFrame(
        {"value": timeseries.to_numpy(), "gap": normalized_gaps}, index=timeseries.index
    )

    return result_df

In [70]:
def check_add_boundary_dates(
    timeseries: pd.Series, start_date: datetime, end_date: datetime, dummy_value=math.pi
) -> pd.Series:
    if start_date not in timeseries.index:
        timeseries[start_date] = dummy_value

    if end_date not in timeseries.index:
        timeseries[end_date] = dummy_value

    timeseries = timeseries.sort_index()

    return timeseries

In [63]:
length_ts=366
timeseries = pd.Series(data=range(length_ts),index=pd.to_datetime(range(length_ts),utc=True,unit="D",origin="2020-01-01T01:15:27.000"))
start_date = "2020-01-01T01:15:27.000Z"
end_date= "2020-12-31T01:15:27.000Z"
auto_stepsize = True
history_end_date= "2020-01-01T01:15:27.000Z"
step_size_str= ""
percentil = 95
min_amount_datapoints = 21
add_gapsize_column = True

In [73]:
def main(timeseries=timeseries,start_date_str=start_date,end_date_str=end_date,
                       auto_stepsize=auto_stepsize,history_end_date_str=history_end_date,step_size_str=step_size_str,percentil=percentil,
                       min_amount_datapoints=min_amount_datapoints,add_gapsize_column=add_gapsize_column):
    timeseries = timeseries.dropna()

    input_params = GapDetectionParameters(
        start_date=start_date_str,
        end_date=end_date_str,
        auto_stepsize=auto_stepsize,
        history_end_date_str=history_end_date_str,
        step_size=step_size_str,
        percentil=percentil,
        min_amount_datapoints=min_amount_datapoints,
        add_gapsize_column=add_gapsize_column,
    )
    constricted_series = constrict_series_to_dates(
        timeseries, start_date_str, end_date_str
    )
    check_amount_datapoints(series=constricted_series,min_amount_datapoints=input_params.min_amount_datapoints)
    if auto_stepsize:
        if input_params.history_end_date is not None:
            training_series = constrict_series_to_dates(
                timeseries, input_params.start_date, input_params.history_end_date
            )
        else:
            training_series = constricted_series
        step_size = determine_timestep_gapsize_percentile(training_series, percentil)
    else:
        step_size = freqstr2timedelta(step_size_str)
    series_with_bounds = check_add_boundary_dates(
        constricted_series, start_date_str, end_date_str
    )

    df_with_gaps = determine_gap_length(series_with_bounds, step_size)
    gap_boundaries = return_gap_boundary_timestamps(df_with_gaps)

    return gap_boundaries

In [74]:
main()

In [29]:
GapDetectionParameters(start_date=start_date,end_date=end_date,
                       auto_stepsize=auto_stepsize,step_size_str=step_size_str,percentil=percentil,
                       min_amount_datapoints=min_amount_datapoints,add_gapsize_column=add_gapsize_column)

GapDetectionParameters(start_date=datetime.datetime(2020, 1, 1, 1, 15, 27, tzinfo=datetime.timezone.utc), end_date={'start_date': datetime.datetime(2020, 1, 1, 1, 15, 27, tzinfo=datetime.timezone.utc), 'end_date': {...}, 'auto_stepsize': True, 'history_end_date': None, 'step_size_str': '', 'percentil': 95.0, 'min_amount_datapoints': 21, 'add_gapsize_column': True}, auto_stepsize=True, history_end_date=None, step_size_str='', percentil=95.0, min_amount_datapoints=21, add_gapsize_column=True)