<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"></ul></div>

In [1]:
# autoreload updated and newly installed packages
# without having to restart Jupyter kernel
%load_ext autoreload
%autoreload 2
# Some little interaction with matplotlib
%matplotlib inline
# Avoid using Jedi for faster autocomplete (tab)
%config Completer.use_jedi = False

## Crack growth analysis

### Hollow cylinder with crack on the external surface 

In [2]:
# Backwards compatibility of the cluster_df function with python 3.8.X
from __future__ import annotations

# Standard imports
import ast
import copy
import datetime
import platform
import os
import sys
import time
from typing import List, Any
from collections import ChainMap, defaultdict

# Non-standard imports
from pathlib import Path
try:
    from tqdm import tqdm
except ImportError:
    print("Install the 'tqdm' module within your environment using pip")
import matplotlib
import matplotlib.pyplot as plt
import numba as nb
import numpy as np
import pandas as pd
import plotly.graph_objs as go

# py-fatigue import
import py_fatigue as pf
import py_fatigue.testing as test
# Paths of this tutorial and of the data
TUTORIAL_PATH = os.path.dirname(os.getcwd())
DATA_PATH = os.path.join(Path.home(), "Documents", "Work", "data")
if not TUTORIAL_PATH in sys.path:
    sys.path.append(TUTORIAL_PATH)
np.set_printoptions(threshold=sys.maxsize)

In [3]:
print(f"Platform: {platform.platform()}")
print(f"Python version: {sys.version}")
print(f"py-fatigue version: {pf.styling.TermColors.CBOLD}{pf.__version__}")

print(f"{pf.styling.TermColors.CWHITEBG}\
{pf.styling.TermColors.CBOLD}\
{pf.styling.TermColors.CBLUE}")
      
print(f"DATA_PATH = {DATA_PATH}")
print(f"TUTORIAL_PATH = {TUTORIAL_PATH}")

print(f"{pf.styling.TermColors.CEND}")

Platform: Linux-5.19.0-26-generic-x86_64-with-glibc2.35
Python version: 3.8.15 (default, Dec 14 2022, 14:18:12) 
[GCC 12.2.0]
py-fatigue version: [1m1.0.9
[47m[1m[34m
DATA_PATH = /home/pd/Documents/Work/data
TUTORIAL_PATH = /home/pd/Python/Packages/Github/py-fatigue
[0m


In [4]:
plt.rcParams["figure.figsize"] = (7, 3.5)
plt.rcParams["font.family"] = ["Sans-Serif"]
plt.rcParams["font.size"] = 10
plt.rcParams["lines.markersize"] = 3
plt.rcParams["axes.grid"] = True
plt.rcParams["axes.grid.which"] = "both"
plt.rcParams["grid.linestyle"] = "-"
plt.rcParams["grid.color"] = "#DDDDDD"
plt.rcParams["axes.spines.right"] = False
plt.rcParams["axes.spines.top"] = False
plt.rcParams["xtick.bottom"] = True
plt.rcParams["xtick.minor.bottom"] = True
plt.rcParams["xtick.direction"] = "in"
plt.rcParams["xtick.labelbottom"] = True
plt.rcParams["ytick.left"] = True
plt.rcParams["ytick.minor.left"] = True
plt.rcParams["ytick.direction"] = "in"
plt.rcParams["ytick.labelleft"] = True
plt.rcParams["image.cmap"] = "Paired"
plt.rcParams['axes.prop_cycle'] = matplotlib.rcsetup.cycler(
    'color',
    ['0C5DA5', '00B945', 'FF9500', 'FF2C00', '845B97', '474747', '9e9e9e']
)

`Signature`

```python
def cluster_df(
    df: pd.DataFrame, time_window: str
) -> tuple[pd.DataFrame, dict[str, dict[str, np.ndarray]]]:
```

`Docstring`

Cluster a pandas dataframe by time window.
The function performs the following workflow:
    
1. Perform initial checks on the input pandas dataframe
2. Build the aggregation dictionary
3. Aggregate the dataframe by time window, i.e. the aggregated CycleCounts
4. Retrieving the low-frequency fatigue dynamics on the aggregated dataframe
5. Saving the residuals sequences of each aggregated CycleCount

**Parameters**

- df : pd.DataFrame
    - The dataframe to cluster
- time_window : str
    - The time window to cluster the dataframe by. It must be an offset_string.     For all the offset_string aliases offered by pandas, see: shorturl.at/dgrwW


**Returns**

- pd.DataFrame
    - The clustered dataframe


In [None]:
from __future__ import annotations

from collections import ChainMap, defaultdict
from typing import Any, DefaultDict, Union
import time

import numpy as np
import pandas as pd

from py_fatigue import cycle_count, CycleCount


def solve_lffd(x: Any) -> Union[Any, CycleCount]:
    """Solve the low-frequency fatigue dynamics of a cycle count or return the
    object as is.

    Parameters
    ----------
    x : Any
        The object to evaluate. If it is a :class:`~CycleCount` object, the
        low-frequency fatigue dynamics is solved. Otherwise, the object is
        returned as is.

    Returns
    -------
    Any
        The object evaluated
    """
    if isinstance(x, CycleCount) and len(x.time_sequence) > 1:
        return x.solve_lffd()
    return x


def aggregate_cc(
    df: pd.DataFrame, aggr_by: str
) -> tuple[pd.DataFrame, DefaultDict[str, DefaultDict[str, list[float]]]]:
    """Aggregate a pandas dataframe by time window.
    The pandas dataframe must have a DatetimeIndex and at least one column
    whose name starts with 'CC_' containing :class:`~CycleCount` instances,
    e.g.,

    +------------+--------------------+-------------------+-----+
    |            | CC_1               | CC_2              | ... |
    +============+====================+===================+=====+
    | timestamp  |                    |                   | ... |
    | 2018-01-01 | CC_1 (01 Jan 2018) | CC_2 (01 Jan 018) | ... |
    | 2018-01-02 | CC_1 (02 Jan 2018) | CC_2 (02 Jan 018) | ... |
    | 2018-01-03 | CC_1 (03 Jan 2018) | CC_2 (03 Jan 018) | ... |
    |⋮           |⋮                   |⋮                   | ⋱   |
    +------------+--------------------+-------------------+-----+

    The function performs the following workflow:

    1. Perform initial checks on the input pandas dataframe
    2. Build the aggregation dictionary
    3. Aggregate the dataframe by time window, i.e. the aggregated CycleCounts
    4. Retrieve the low-frequency fatigue dynamics on the aggregated dataframe
    5. Save the residuals sequences of each aggregated CycleCount

    Parameters
    ----------
    df : pd.DataFrame
        The dataframe to cluster
    aggr_by : str
        The time window to cluster the dataframe by. It must be an
        offset_string.
        For all the offste_string aliases offered by pandas, see:
        shorturl.at/dgrwW

    Returns
    -------
    tuple[pd.DataFrame, dict[str, dict[str, list]]]
        The aggregated dataframe and the residuals sequences of each aggregated
    """
    start = time.time()

    # Perform initial checks on the input pandas dataframe
    print("\33[36m1. Running checks on \33[1mdf\33[22m.\33[0m")
    if not isinstance(df, pd.DataFrame):
        raise TypeError("df must be a pandas DataFrame")
    if not isinstance(df.index, pd.DatetimeIndex):
        raise TypeError("df must have a DatetimeIndex")
    if not df.index.is_monotonic_increasing:
        raise ValueError("df must have a monotonic increasing DatetimeIndex")
    if not df.index.is_unique:
        raise ValueError("df must have a unique DatetimeIndex")
    if not df.index.inferred_type == "datetime64":
        raise ValueError("df must have a DatetimeIndex containing only dates")

    # Build the aggregation dictionary
    print("\33[36m2. Building the aggregation \33[1mdict\33[22m.")
    agg_list: list[dict[float | str, Any]] = [
        {col: cycle_count.pbar_sum}
        if isinstance(col, str) and col.startswith("CC_")
        else {col: np.nanmean}
        for col in df
    ]
    agg_dict = dict(ChainMap(*agg_list))

    # Aggregate the dataframe by time window
    print(f"3. Aggregate \33[1mdf\33[22m by \33[1m'{aggr_by}'\33[22m.\33[0m")
    df_agg = df.groupby([df.index.to_period(aggr_by)]).agg(agg_dict)

    # Retrieving the low-frequency fatigue dynamics on the aggregated dataframe
    print("\33[36m4. Retrieving LFFD on aggregated \33[1mdf\33[22m.\33[0m")
    df_agg_rr = df_agg.applymap(solve_lffd)

    cc_cols: list[str] = [
        col for col in df_agg_rr.columns if col.startswith("CC_")
    ]

    # Saving the residuals sequences
    print("\33[36m5. Saving the \33[1mresiduals sequences\33[22m.\33[0m")
    residuals_sequence: DefaultDict[
        str, DefaultDict[str, list[float]]
    ] = defaultdict(lambda: defaultdict(list))
    for col in cc_cols:
        for __, row in df_agg.iterrows():
            _, res_res_seq, res_res_idx = cycle_count.calc_rainflow(
                data=np.asarray(row[col].residuals_sequence),
                extended_output=True,
            )
            if len(residuals_sequence[col]["idx"]) > 0:
                res_res_idx += residuals_sequence[col]["idx"][-1]
            residuals_sequence[col]["idx"].extend(res_res_idx.tolist())
            residuals_sequence[col]["res"].extend(res_res_seq.tolist())
    end = time.time()
    print(
        f"\nElapsed time for \33[36m\33[1m'{aggr_by}'\33[0m aggregation",
        f"is {np.round(end-start, 0)}, s.",
    )
    return df_agg_rr, residuals_sequence


### Import the data from csv file

The following operations are run:

1. Read .csv file
2. Set 'timestamp' as index
3. Convert literal column containing dictionary to python dictionary

In [None]:
start = time.time()
# 1
print(f"\33[36m1. Read \33[1m.csv\33[22m file\33[0m")
df_ = pd.read_csv(os.path.join(DATA_PATH, "CC_BB_C01_SS_2018_TO_2021.csv"))
# 2
print(f"\33[36m2. Set \33[1m'timestamp'\33[22m as index\33[0m")
df_ = df_.set_index("timestamp")
df_.index = pd.to_datetime(df_.index)
# 3
for col in df_.columns:
    if col.startswith("CC_"):
        tqdm.pandas(desc=f"\33[36m3. Converting \33[1m{col[:3]}{col[13:]}\33[0m") 
        df_[col] = df_[col].progress_apply(
            lambda x: ast.literal_eval(x) if not x!=x else x
        )
end = time.time()
print(f"\nElapsed time is {np.round(end-start, 0)}, s.")

In [None]:
df = copy.deepcopy(df_)

## From `dict` to `CycleCount`

In [None]:
# 4
start = time.time()
tqdm.pandas(desc=f"\33[36m4. From \33[1mdict \33[22mto \33[1mpf.CycleCount\33[0m")
for col in df.columns:
    if col.startswith("CC_"):
        df[col] = df.progress_apply(
            lambda x: pf.CycleCount.from_rainflow(
                x[col], name=col[13:], timestamp=x.name
            ) if not x[col]!=x[col] else x[col], axis=1
        )
end = time.time()
print(f"\nElapsed time is {np.round(end-start, 0)}, s.")

## Aggregate the `CycleCounts` in `df` by a time window

In [None]:
df

In [None]:
n_days = 3 
wf = copy.deepcopy(df).head(144 * n_days)  # Select the needed days (144 × n_days)

In [None]:
df_T, hc_T = cluster_df(wf, 'T')

In [None]:
df_D, hc_D = cluster_df(wf, 'D')

In [None]:
df_W, hc_W = cluster_df(wf, 'W')

In [None]:
df_M, hc_M = cluster_df(wf, 'M')

In [None]:
df_Q, hc_Q = cluster_df(wf, 'Q')

In [None]:
def plot_aggregated_residuals(
    dfs: tuple[pd.DataFrame, ...],
    plt_prmtr: str,
):
    pass

In [None]:
plt_prmtr = "CC_BB_C01_TP_SG_LAT019_DEG325_0_nr1"
fig, axes = plt.subplots()
axes.plot(hc_T[plt_prmtr]["idx"], hc_T[plt_prmtr]["res"],
          lw=0.3, label="None", alpha=0.25)
axes.plot(hc_D[plt_prmtr]["idx"], hc_D[plt_prmtr]["res"],
          lw=0.5, label="Daily", alpha=0.4)
axes.plot(hc_W[plt_prmtr]["idx"], hc_W[plt_prmtr]["res"],
          lw=0.7, label="Weekly", alpha=0.55)
axes.plot(hc_M[plt_prmtr]["idx"], hc_M[plt_prmtr]["res"],
          lw=0.9, label="Monthly", alpha=0.7)
axes.plot(hc_Q[plt_prmtr]["idx"], hc_Q[plt_prmtr]["res"],
          lw=1.1, label="Quarterly", alpha=0.85)
# axes.minorticks_on()
# axes.grid(visible=True, which="minor", color="#E7E6DD", linestyle=":")
axes.set_xlabel("Residuals sequence")
axes.set_ylabel("Residuals")
axes.legend(title="Aggregation type", loc='lower center',
           bbox_to_anchor=(0.5, -0.44), ncol=3, fancybox=True, shadow=True)
plt.show()

In [None]:
grouped = df_.groupby(pd.Grouper(freq="D", key="Date"))

In [None]:
tick_features = [('CC_BB_C01_TP_SG_LAT019_DEG025_0_nr1',
                  lambda x: pf.cycle_count.pbar_sum(x)),]
grouped = df_.groupby(pd.Grouper(freq="D", key="Date"))

In [None]:
tick = grouped.agg(tick_features)

In [None]:
tick

In [None]:
grouped.CC_BB_C01_TP_SG_LAT019_DEG025_0_nr1.tail()

In [None]:
dg = pf.cycle_count.pbar_sum(grouped)

In [None]:
from pandas.api.types import is_datetime64_any_dtype as is_datetime

In [None]:
df_[[column for column in df_.columns if is_datetime(df_[column])]]

In [None]:
[is_datetime(df_[column]) for column in df_.columns if is_datetime(df_[column])]