# 数据预处理
## 加载数据预处理函数

In [1]:
import pickle
import warnings
from pprint import pprint

import numpy as np
from scipy.interpolate import interp1d
from scipy.signal import savgol_filter

from trainer.rebuilding_features import load_batches_to_dict
import trainer.constants as cst

In [2]:
class DropCycleException(Exception):
    """Used for dropping whole cycles without additional information."""
    pass


class OutlierException(Exception):
    """Used for dropping whole cycles based on detected outliers"""
    def __init__(self, message, outlier_dict):
        super().__init__(message)
        self.outlier_dict = outlier_dict


def check_for_drop_warning(array_before, array_after, drop_warning_thresh=0.10):
    """Checks weather the size of array_after is "drop_warning_thresh"-percent
    smaller than array_before and issues a warning in that case."""

    try:
        assert float(len(array_before) - len(array_after)) / len(array_before) < drop_warning_thresh, \
            """More than {} percent of values were dropped ({} out of {}).""".format(
                drop_warning_thresh * 100,
                len(array_before) - len(array_after),
                len(array_before))
    except AssertionError as e:
        warnings.warn(str(e))
        # simple_plotly(array_before[-1], V_original=array_before[2])
        # simple_plotly(array_after[-1], V_indexed=array_after[2])
    finally:
        pass


def multiple_array_indexing(valid_numpy_index, *args, drop_warning=False, drop_warning_thresh=0.10):
    """Indexes multiple numpy arrays at once and returns the result in a tuple.

    Arguments:
        numpy_index {numpy.ndarray or integer sequence} -- The used indeces.

    Returns:
        tuple -- reindexed numpy arrays from *args in the same order.
    """
    indexed_arrays = [arg[valid_numpy_index].copy() for arg in args]

    if drop_warning:
        check_for_drop_warning(args[0], indexed_arrays[0])
    return tuple(indexed_arrays)


def outlier_dict_without_mask(outlier_dict):
    """Modifies an outlier dict for printing purposes by removing the mask.

    Arguments:
        outlier_dict {dict} -- Original outliert dictionary.

    Returns:
        dict -- Same outlier dict without the key "outliert_mask"
    """
    outlier_dict_wo_mask = dict()
    for key in outlier_dict.keys():
        outlier_dict_wo_mask[key] = {k: v for k, v in outlier_dict[key].items() if k != "outlier_mask"}
    return outlier_dict_wo_mask


def compute_outlier_dict(std_multiple_threshold, verbose=False, **kwargs):
    """Checks for outliers in all numpy arrays given in kwargs by computing the standard deveation of np.diff().
    Outliers for every array are defined at the indeces, where the np.diff() is bigger than
    std_multiple_threshold times the standard deviation.

    Keyword Arguments:
        std_multiple_threshold {int} -- Threshold that defines an outlier by multiplying with the
            standard deveation (default: {15})
        verbose {bool} -- If True, prints the values for every found outlier (default: {False})

    Returns:
        dict -- The outliert results taged by the names given in kwargs
    """
    outlier_dict = dict()

    for key, value in kwargs.items():
        diff_values = np.diff(value, prepend=value[0])
        std_diff = diff_values.std()
        outlier_mask = diff_values > (std_multiple_threshold * std_diff)  # Get the mask for all outliers
        outlier_indeces = np.argwhere(outlier_mask)  # Get the indeces for all outliers

        if outlier_indeces.size > 0:  # Add outlier information to the outlier dict, if an outlier has been found
            outlier_dict[key] = dict(std_diff=std_diff,
                                     original_values=value[outlier_indeces],
                                     diff_values=diff_values[outlier_indeces],
                                     outlier_indeces=outlier_indeces,
                                     outlier_mask=outlier_mask)

    if verbose and outlier_dict:
        # If outlier_dict has any entries, then print a version without the mask (too big for printing)
        outlier_dict_wo_mask = outlier_dict_without_mask(outlier_dict)  # Generate a smaller dict for better printing
        print("############ Found outliers ############ ")
        pprint(outlier_dict_wo_mask)
        print("")
    return outlier_dict


def drop_cycle_big_t_outliers(std_multiple_threshold, Qd, T, V, t, t_diff_outlier_thresh=100):
    """Checks for big outliers in the np.diff() values of t.
    If any are found the whole cyce is dropped, with one exception:
        There is only one outlier which lays right after the end of discharging.
        In this case, all measurement values of Qd, T, V and t after this outlier are dropped and their values returned.

        The end of discharging is defined as a V value below 2.01.

    Arguments:
        outlier_dict {dict} -- Dictionary with outlier information for the whole cycle.
        Qd {numpy.ndarray} -- Qd during discharging
        T {numpy.ndarray} -- T during discharging
        V {numpy.ndarray} -- V during discharging
        t {numpy.ndarray} -- t during discharging
        t_diff_outlier_thresh {int} -- Threshold that defines what a "big" t outliert is

    Raises:
        OutlierException: Will be raised, if the whole cycle should be dropped.

    Returns:
        Tuple of numpy.ndarray  -- Returns the original values of Qd, T, V and t if no big t outlier is found, or
            a slice of all arrays if the only outlier lays right after the end of discharging.
    """
    outlier_dict = compute_outlier_dict(std_multiple_threshold=std_multiple_threshold, Qd=Qd, T=T, V=V, t=t)
    if outlier_dict.get("t"):  # If any outliert was found in t
        t_outlier_mask = outlier_dict["t"]["diff_values"] > t_diff_outlier_thresh
    else:
        t_outlier_mask = None

    if np.any(t_outlier_mask):  # Only do something if there are big outliers.
        # Get the indeces 1 before the t outliers.
        indeces_before_t_outliers = outlier_dict["t"]["outlier_indeces"][t_outlier_mask] - 1
        # Get the minimum V value right before all t outliers.
        V_before_t_outlier = np.min(V[indeces_before_t_outliers])

        # If there is excatly one t outlier right at the end of discharging,
        #   drop all values after this index and continue with processing.
        if indeces_before_t_outliers.size == 1 and V_before_t_outlier < 2.01:
            i = int(indeces_before_t_outliers) + 1
            return Qd[:i], T[:i], V[:i], t[:i]
        else:
            raise OutlierException(
                "    Dropping cycle based on outliers with np.diff(t) > {} with value(s) {}".format(
                    t_diff_outlier_thresh,
                    list(outlier_dict["t"]["diff_values"][t_outlier_mask])),
                outlier_dict)
    else:
        return Qd, T, V, t


def drop_outliers_starting_left(std_multiple_threshold, Qd, T, V, t):
    """Searches for outliers in Qd, T, V and t and drops them one by one starting with the smallest index.
    Outlier indeces are dropped from every array simultaniously, so the sizes still match.
    After the first outliers from every array have been dropped, outliers are computed again, to not drop
    false detections.

    Arguments:
        std_multiple_threshold {int} -- Threshold for the compute_outlier_dict function
        Qd {numpy.ndarray} -- Qd measurements
        T {numpy.ndarray} -- T measurements
        V {numpy.ndarray} -- V measurements
        t {numpy.ndarray} -- t measurements

    Returns:
        tuple of numpy.ndarrays -- All arrays without outliers
    """
    Qd_, T_, V_, t_ = Qd.copy(), T.copy(), V.copy(), t.copy()

    # Initialize and compute outliers
    drop_counter = 0
    outlier_dict = compute_outlier_dict(std_multiple_threshold, verbose=True, Qd=Qd_, T=T_, V=V_, t=t_)
    original_outlier_dict = outlier_dict  # copy for debugging und raising OutlierException.

    # Process until no outliers are found.
    while outlier_dict:
        # Get indeces of the left most outlier for every array.
        first_outlier_indeces = [np.min(outlier_info["outlier_indeces"]) for outlier_info in outlier_dict.values()]
        # Only consider every index once and make it a list type for numpy indexing in array_exclude_index().
        unique_indeces_to_drop = list(set(first_outlier_indeces))

        # Drop all unique outlier indeces from all arrays.
        Qd_ = array_exclude_index(Qd_, unique_indeces_to_drop)
        T_ = array_exclude_index(T_, unique_indeces_to_drop)
        V_ = array_exclude_index(V_, unique_indeces_to_drop)
        t_ = array_exclude_index(t_, unique_indeces_to_drop)

        drop_counter += len(unique_indeces_to_drop)

        # Recompute outlierts after dropping the unique indeces from all arrays.
        outlier_dict = compute_outlier_dict(std_multiple_threshold, Qd=Qd_, T=T_, V=V_, t=t_)

    if drop_counter > 0:
        print("    Dropped {} outliers in {}".format(drop_counter, list(original_outlier_dict.keys())))
        print("")

    check_for_drop_warning(Qd, Qd_)
    return Qd_, T_, V_, t_


def array_exclude_index(arr, id):
    """Returns the given array without the entry at id.
    id can be any valid numpy index."""

    mask = np.ones_like(arr, bool)
    mask[id] = False
    return arr[mask]


def handle_small_Qd_outliers(std_multiple_threshold, Qd, t, Qd_max_outlier=0.06):
    """Handles specifically small outliers in Qd, which are a result of constant values for a
    small number of measurements before the "outlier". The constant values are imputed by linearly interpolating
    Qd over t, since Qd over t should be linear anyways. This way the "outlier" is "neutralized", since there is no
    "step" left from the constant values to the outlier value.

    Arguments:
        std_multiple_threshold {int} -- Threshold to use for the compute_outlier_dict function
        Qd {numpy.ndarray} -- Qd measurements
        t {numpy.ndarray} -- t measurements corresponding to Qd

    Keyword Arguments:
        Qd_max_outlier {float} -- The maximum absolute value for the found outliers in Qd, which get handled
            by this function.
        This is needed only to make the function more specific. (default: {0.06})

    Returns:
        numpy.ndarray -- The interpolated version of Qd.
    """

    Qd_ = Qd.copy()  # Only copy Qd, since it is the only array values are assigned to
    outlier_dict = compute_outlier_dict(std_multiple_threshold, Qd=Qd_)

    if outlier_dict.get("Qd"):
        # Get only the indeces of all small outliers
        small_diff_value_mask = outlier_dict["Qd"]["diff_values"] <= Qd_max_outlier
        ids = outlier_dict["Qd"]["outlier_indeces"][small_diff_value_mask]
    else:
        ids = None

    if ids:
        # Interpolate all values before small outliers that stay constant (np.diff == 0)
        for i in ids:
            # Get the last index, where the value of Qd doesn't stay constant before the outlier.
            start_id = int(np.argwhere(np.diff(Qd_[:i]) > 0)[-1])

            # Make a mask for where to interpolate
            interp_mask = np.zeros_like(Qd_, dtype=bool)
            interp_mask[start_id:i] = True
            interp_values = np.interp(
                t[interp_mask],  # Where to evaluate the interpolation function.
                t[~interp_mask],  # X values for the interpolation function.
                Qd_[~interp_mask]  # Y values for the interpolation function.
            )
            # Assign the interpolated values
            Qd_[interp_mask] = interp_values
            print("    Interpolated small Qd outlier from index {} to {}".format(start_id, i))

    return Qd_


def make_strictly_decreasing(x_interp, y_interp, prepend_value=3.7):
    """Takes a monotonically decreasing array y_interp and makes it strictly decreasing by interpolation over x_interp.

    Arguments:
        x_interp {numpy.ndarray} -- The values to interpolate over.
        y_interp {numpy.ndarray} -- Monotonically decreasing values.

    Keyword Arguments:
        prepend_value {float} -- Value to prepend to y_interp for assesing the difference to the preceding value.
            (default: {3.7})

    Returns:
        numpy.ndarray -- y_interp with interpolated values, where there used to be zero difference to
            the preceding value.
    """
    y_interp_copy = y_interp.copy()
    # Make the tale interpolatable if the last value is not the single minimum.
    if y_interp_copy[-1] >= y_interp_copy[-2]:
        y_interp_copy[-1] -= 0.0001

    # Build a mask for all values, which do not decrease.
    bigger_equal_zero_diff = np.diff(y_interp_copy, prepend=prepend_value) >= 0
    # Replace these values with interpolations based on their border values.
    interp_values = np.interp(
        x_interp[bigger_equal_zero_diff],  # Where to evaluate the interpolation function.
        x_interp[~bigger_equal_zero_diff],  # X values for the interpolation function.
        y_interp_copy[~bigger_equal_zero_diff]  # Y values for the interpolation function.
    )
    y_interp_copy[bigger_equal_zero_diff] = interp_values

    # This has to be given, since interpolation will fail otherwise.
    assert np.all(np.diff(y_interp_copy) < 0), "The result y_copy is not strictly decreasing!"

    return y_interp_copy


def preprocess_cycle(cycle,
                     I_thresh=-3.99,
                     Vdlin_start=3.5,
                     Vdlin_stop=2.0,
                     Vdlin_steps=cst.STEPS,
                     return_original_data=False):
    """Processes data (Qd, T, V, t) from one cycle and resamples Qd, T and V to a predefinded dimension.
    discharging_time will be computed based on t and is the only returned feature that is a scalar.

    Arguments:
        cycle {dict} -- One cycle entry from the original data with keys 'I', 'Qd', 'T', 'V', 't'

    Keyword Arguments:
        I_thresh {float} -- Only measurements where the current is smaller than this threshold are chosen
            (default: {-3.99})
        Vdlin_start {float} -- Start value for the resampled V (default: {3.5})
        Vdlin_stop {float} -- Stop value for the resampled V (default: {2.0})
        Vdlin_steps {int} -- Number of steps V, Qd and T are resampled (default: {1000})
        return_original_data {bool} -- Weather the original datapoints, which were used for interpolation,
            shold be returned in the results  (default: {False})

    Returns:
        {dict} -- Dictionary with the resampled (and original) values.
    """

    Qd = cycle["Qd"]
    T = cycle["T"]
    V = cycle["V"]
    I = cycle["I"]  # noqa: E741
    t = cycle["t"]

    # Only take the measurements during high current discharging.
    discharge_mask = I < I_thresh
    Qd, T, V, t = multiple_array_indexing(discharge_mask, Qd, T, V, t)

    # Sort all values after time.
    sorted_indeces = t.argsort()
    Qd, T, V, t = multiple_array_indexing(sorted_indeces, Qd, T, V, t)

    # Only take timesteps where time is strictly increasing.
    increasing_time_mask = np.diff(t, prepend=0) > 0
    Qd, T, V, t = multiple_array_indexing(increasing_time_mask, Qd, T, V, t)

    # Dropping outliers.
    Qd, T, V, t = drop_cycle_big_t_outliers(15, Qd, T, V, t)

    Qd = handle_small_Qd_outliers(12, Qd, t)

    Qd, T, V, t = drop_outliers_starting_left(12, Qd, T, V, t)

    # Apply savitzky golay filter to V to smooth out the values.
    # This is done in order to not drop too many values in the next processing step (make monotonically decreasing).
    # This way the resulting curves don't become skewed too much in the direction of smaller values.
    savgol_window_length = 25
    if savgol_window_length >= V.size:
        raise DropCycleException("""Dropping cycle with less than {} V values.\nSizes --> Qd:{}, T:{}, V:{}, t:{}"""
                                 .format(savgol_window_length, Qd.size, T.size, V.size, t.size))
    V_savgol = savgol_filter(V, window_length=25, polyorder=2)

    # Only take the measurements, where V is monotonically decreasing (needed for interpolation).
    # This is done by comparing V to the accumulated minimum of V.
    #    accumulated minimum --> (taking always the smallest seen value from V from left to right)
    v_decreasing_mask = V_savgol == np.minimum.accumulate(V_savgol)
    Qd, T, V, t = multiple_array_indexing(v_decreasing_mask, Qd, T, V_savgol, t, drop_warning=True)

    # Make V_3 strictly decreasing (needed for interpolation).
    V_strict_dec = make_strictly_decreasing(t, V)

    # Calculate discharging time. (Only scalar feature which is returned later)
    discharging_time = t.max() - t.min()
    if discharging_time < 6:
        print("Test")
        raise DropCycleException("Dropping cycle with discharge_time = {}"
                                 .format(discharging_time))

    # Make interpolation function.
    Qd_interp_func = interp1d(
        V_strict_dec[::-1],  # V_strict_dec is inverted because it has to be increasing for interpolation.
        Qd[::-1],  # Qd and T are also inverted, so the correct values line up.
        bounds_error=False,  # Allows the function to be evaluated outside of the range of V_strict_dec.
        fill_value=(Qd[::-1][0], Qd[::-1][-1])  # Values to use, when evaluated outside of V_strict_dec.
    )
    T_interp_func = interp1d(
        V_strict_dec[::-1],
        T[::-1],
        bounds_error=False,
        fill_value=(T[::-1][0], T[::-1][-1])
    )

    # For resampling the decreasing order is chosen again.
    # The order doesn't matter for evaluating Qd_interp_func.
    Vdlin = np.linspace(Vdlin_start, Vdlin_stop, Vdlin_steps)

    Qdlin = Qd_interp_func(Vdlin)
    Tdlin = T_interp_func(Vdlin)

    if return_original_data:
        return {
            cst.QDLIN_NAME: Qdlin,
            cst.TDLIN_NAME: Tdlin,
            cst.VDLIN_NAME: Vdlin,
            cst.DISCHARGE_TIME_NAME: discharging_time,
            # Original data used for interpolation.
            "Qd_original_data": Qd,
            "T_original_data": T,
            "V_original_data": V,
            "t_original_data": t
        }
    else:
        return {
            cst.QDLIN_NAME: Qdlin,
            cst.TDLIN_NAME: Tdlin,
            cst.VDLIN_NAME: Vdlin,
            cst.DISCHARGE_TIME_NAME: discharging_time
        }


def preprocess_batch(batch_dict, return_original_data=False, return_cycle_drop_info=False, verbose=False):
    """Processes all cycles of every cell in batch_dict and returns the results in the same format.

    Arguments:
        batch_dict {dict} -- Unprocessed batch of cell data.

    Keyword Arguments:
        return_original_data {bool} -- If True, the original data used for interpolation is returned. (default: {False})
        verbose {bool} -- If True prints progress for every cell (default: {False})

    Returns:
        dict -- Results in the same format as batch_dict.
    """
    batch_results = dict()
    cycles_drop_info = dict()

    print("Start processing data ...")

    for cell_key in list(batch_dict.keys()):
        # The iteration is over a list of keys so the processed keys can be removed while iterating over the dict.
        # This reduces the memory used during processing.
        # If "for cell_key, cell_value in batch_dict.items()" is used,
        #    "del batch_dict[cell_key]" would throw an RuntimeError: dictionary changed size during iteration.
        cell_value = batch_dict[cell_key]
        # Initialite the cell results with all available scalar values.
        batch_results[cell_key] = dict(
            cycle_life=cell_value["cycle_life"][0][0],
            summary={
                cst.INTERNAL_RESISTANCE_NAME: [],
                cst.QD_NAME: [],
                cst.REMAINING_CYCLES_NAME: [],
                cst.DISCHARGE_TIME_NAME: []
            },
            cycles=dict()
        )

        for cycle_key, cycle_value in cell_value["cycles"].items():
            # Has to be skipped since there are often times only two measurements.
            if cycle_key == '0':
                continue
            # Some cells have more cycle measurements than recorded cycle_life.
            # The reamining cycles will be dropped.
            elif int(cycle_key) > int(cell_value["cycle_life"][0][0]):
                print("    Cell {} has more cycles than cycle_life ({}): Dropping remaining cycles {} to {}"
                      .format(cell_key,
                              cell_value["cycle_life"][0][0],
                              cycle_key,
                              max([int(k) for k in cell_value["cycles"].keys()])))
                break

            # Start processing the cycle.
            try:
                cycle_results = preprocess_cycle(cycle_value, return_original_data=return_original_data)

            except DropCycleException as e:
                print("cell:", cell_key, " cycle:", cycle_key)
                print(e)
                print("")
                # Documenting dropped cell and key
                drop_info = {cell_key: {cycle_key: None}}
                cycles_drop_info.update(drop_info)
                continue

            except OutlierException as oe:  # Can be raised if preprocess_cycle, if an outlier is found.
                print("cell:", cell_key, " cycle:", cycle_key)
                print(oe)
                print("")
                # Adding outlier dict from Exception to the cycles_drop_info.
                drop_info = {
                    cell_key: {
                        cycle_key: outlier_dict_without_mask(oe.outlier_dict)}}
                cycles_drop_info.update(drop_info)
                continue

            # Copy summary values for this cycle into the results.
            # I tried writing it into an initialized array, but then indeces of dropped cycles get skipped.
            batch_results[cell_key]["summary"][cst.INTERNAL_RESISTANCE_NAME].append(
                cell_value["summary"][cst.INTERNAL_RESISTANCE_NAME][int(cycle_key)])
            batch_results[cell_key]["summary"][cst.QD_NAME].append(
                cell_value["summary"][cst.QD_NAME][int(cycle_key)])
            batch_results[cell_key]["summary"][cst.REMAINING_CYCLES_NAME].append(
                cell_value["cycle_life"][0][0] - int(cycle_key))

            # Append the calculated discharge time.
            # This is the only scalar results from preprocess_cycle
            batch_results[cell_key]["summary"][cst.DISCHARGE_TIME_NAME].append(
                cycle_results.pop(cst.DISCHARGE_TIME_NAME))

            # Write the results to the correct cycle key.
            batch_results[cell_key]["cycles"][cycle_key] = cycle_results

        # Convert lists of appended values to numpy arrays.
        for k, v in batch_results[cell_key]["summary"].items():
            batch_results[cell_key]["summary"][k] = np.array(v)

        if verbose:
            print(cell_key, "done")
        # Delete cell key from dict, to reduce used memory during processing.
        del batch_dict[cell_key]

    cycles_drop_info["number_distinct_cells"] = len(cycles_drop_info)
    cycles_drop_info["number_distinct_cycles"] = sum([len(value) for key, value in cycles_drop_info.items()
                                                      if key != "number_distinct_cells"])

    print("Done processing data.")
    if return_cycle_drop_info:
        return batch_results, cycles_drop_info
    else:
        return batch_results


def describe_results_dict(results_dict):
    """Prints summary statistics for all computed results over every single cycle.
    This might take a few seconds, since it has to search the whole dictionary for every vallue."""
    print("Collecting results data and printing results ...")
    describe_dict = dict()

    cycle_life_list = [cell["cycle_life"] for cell in results_dict.values()]

    describe_dict.update(dict(
        cycle_life=dict(
            max=np.max(cycle_life_list),
            min=np.min(cycle_life_list),
            mean=np.mean(cycle_life_list),
            std=np.std(cycle_life_list)
        )
    ))

    summary_results = dict()
    for k in [cst.INTERNAL_RESISTANCE_NAME,
              cst.QD_NAME,
              cst.REMAINING_CYCLES_NAME,
              cst.DISCHARGE_TIME_NAME]:
        summary_results[k] = dict(
            max=np.max([np.max(cell["summary"][k]) for cell in results_dict.values()]),
            min=np.min([np.min(cell["summary"][k]) for cell in results_dict.values()]),
            mean=np.mean([np.mean(cell["summary"][k]) for cell in results_dict.values()]),
            mean_std=np.std([np.std(cell["summary"][k]) for cell in results_dict.values()])
        )
    describe_dict.update(dict(summary_results=summary_results))

    cycle_results = dict()
    for k in [cst.QDLIN_NAME, cst.TDLIN_NAME]:
        cycle_results[k] = dict(
            max=np.max([np.max(cycle[k]) for cell in results_dict.values() for cycle in cell["cycles"].values()]),
            min=np.min([np.min(cycle[k]) for cell in results_dict.values() for cycle in cell["cycles"].values()]),
            mean=np.mean([np.mean(cycle[k]) for cell in results_dict.values() for cycle in cell["cycles"].values()]),
            mean_std=np.mean([np.std(cycle[k]) for cell in results_dict.values() for cycle in cell["cycles"].values()])
        )
    describe_dict.update(dict(cycle_results=cycle_results))

    pprint(describe_dict)


def save_preprocessed_data(results_dict, save_dir=cst.PROCESSED_DATA):
    print("Saving preprocessed data to {}".format(save_dir))
    with open(save_dir, 'wb') as f:
        pickle.dump(results_dict, f)


def load_preprocessed_data(save_dir=cst.PROCESSED_DATA):
    print("Loading preprocessed data from {}".format(save_dir))
    with open(save_dir, 'rb') as f:
        return pickle.load(f)


def data_processing_main():
    batch_dict = load_batches_to_dict(amount_to_load=3)

    results, cycles_drop_info = preprocess_batch(batch_dict,
                                                 return_original_data=False,
                                                 return_cycle_drop_info=True,
                                                 verbose=True)

    pprint(cycles_drop_info)
    # describe_results_dict(results)

    save_preprocessed_data(results)
    print("Done!")

## 启动数据预处理脚本

In [3]:
data_processing_main()

Loading batch1 ...
Loading batch2 ...
Loading batch3 ...
Done loading batches
Start processing data ...
    Interpolated small Qd outlier from index 34 to 42
b1c0 done
b1c1 done
cell: b1c2  cycle: 1486
    Dropping cycle based on outliers with np.diff(t) > 100 with value(s) [488.0221083333337]

b1c2 done
cell: b1c3  cycle: 11
    Dropping cycle based on outliers with np.diff(t) > 100 with value(s) [407.7868733333333]

b1c3 done
cell: b1c4  cycle: 11
    Dropping cycle based on outliers with np.diff(t) > 100 with value(s) [407.7970250000001]

    Interpolated small Qd outlier from index 176 to 186
b1c4 done
cell: b1c5  cycle: 908
    Dropping cycle based on outliers with np.diff(t) > 100 with value(s) [264.66089166666694]

b1c5 done
    Interpolated small Qd outlier from index 75 to 84
b1c6 done
    Interpolated small Qd outlier from index 84 to 92
b1c7 done
b1c9 done
    Interpolated small Qd outlier from index 152 to 162
b1c11 done
    Interpolated small Qd outlier from index 182 to 1



    Cell b2c40 has more cycles than cycle_life (499.0): Dropping remaining cycles 500 to 529
b2c40 done
    Cell b2c41 has more cycles than cycle_life (429.0): Dropping remaining cycles 430 to 451
b2c41 done
cell: b2c42  cycle: 246
    Dropping cycle based on outliers with np.diff(t) > 100 with value(s) [490.9387683333335]

    Cell b2c42 has more cycles than cycle_life (466.0): Dropping remaining cycles 467 to 491
b2c42 done
    Cell b2c43 has more cycles than cycle_life (462.0): Dropping remaining cycles 463 to 486
b2c43 done
    Cell b2c44 has more cycles than cycle_life (457.0): Dropping remaining cycles 458 to 478
b2c44 done
    Cell b2c45 has more cycles than cycle_life (487.0): Dropping remaining cycles 488 to 513
b2c45 done
    Cell b2c46 has more cycles than cycle_life (429.0): Dropping remaining cycles 430 to 448
b2c46 done
    Cell b2c47 has more cycles than cycle_life (713.0): Dropping remaining cycles 714 to 744
b2c47 done
b3c0 done
b3c1 done
b3c3 done
b3c4 done
b3c5 done




b3c22 done
b3c24 done
b3c25 done
b3c26 done
b3c27 done
b3c28 done
b3c29 done
b3c30 done
b3c31 done
b3c33 done
b3c34 done
b3c35 done
b3c36 done
b3c40 done
############ Found outliers ############ 
{'t': {'diff_values': array([[11.43943]]),
       'original_values': array([[21.80653333]]),
       'outlier_indeces': array([[1]]),
       'std_diff': 0.7327046437363047}}

    Dropped 100 outliers in ['t']





b3c41 done
b3c42 done
b3c43 done
b3c44 done
b3c45 done
Done processing data.
{'b1c2': {'1486': {'t': {'diff_values': array([[488.02210833]]),
                         'original_values': array([[522.817995]]),
                         'outlier_indeces': array([[71]]),
                         'std_diff': 26.8623624050339}}},
 'b1c20': {'12': {'t': {'diff_values': array([[408.80270333]]),
                        'original_values': array([[439.54111667]]),
                        'outlier_indeces': array([[33]]),
                        'std_diff': 21.849172126874798}}},
 'b1c21': {'12': {'t': {'diff_values': array([[408.79158333]]),
                        'original_values': array([[439.59233167]]),
                        'outlier_indeces': array([[34]]),
                        'std_diff': 21.78640787927781}}},
 'b1c3': {'11': {'V': {'diff_values': array([[0.1669254]]),
                       'original_values': array([[3.1514935]]),
                       'outlier_indeces': array([[218

## 数据 PIPLINE

## 加载训练数据生成函数


In [7]:
import csv
import pickle
import os

import tensorflow as tf
from tensorflow.train import FloatList, Feature, Features, Example

import trainer.constants as cst

In [5]:
def get_cycle_example(cell_value, summary_idx, cycle_idx, scaling_factors):
    """
    Define the columns that should be written to tfrecords and converts the raw data
    to "Example" objects. Every Example contains data from one charging cycle.
    The data is scaled (divided) by the corresponding values in "scaling_factors".
    """
    # Summary feature values (scalars --> have to be wrapped in lists)
    ir_value = [cell_value["summary"][cst.INTERNAL_RESISTANCE_NAME][summary_idx]
                / scaling_factors[cst.INTERNAL_RESISTANCE_NAME]]
    qd_value = [cell_value["summary"][cst.QD_NAME][summary_idx]
                / scaling_factors[cst.QD_NAME]]
    rc_value = [cell_value["summary"][cst.REMAINING_CYCLES_NAME][summary_idx]
                / scaling_factors[cst.REMAINING_CYCLES_NAME]]
    dt_value = [cell_value["summary"][cst.DISCHARGE_TIME_NAME][summary_idx]
                / scaling_factors[cst.DISCHARGE_TIME_NAME]]
    cc_value = [float(cycle_idx)
                / scaling_factors[cst.REMAINING_CYCLES_NAME]]  # Same scale --> same scaling factor

    # Detail feature values (arrays)
    qdlin_value = cell_value["cycles"][cycle_idx][cst.QDLIN_NAME] / scaling_factors[cst.QDLIN_NAME]
    tdlin_value = cell_value["cycles"][cycle_idx][cst.TDLIN_NAME] / scaling_factors[cst.TDLIN_NAME]

    # Wrapping as example
    cycle_example = Example(
        features=Features(
            feature={
                cst.INTERNAL_RESISTANCE_NAME:
                    Feature(float_list=FloatList(value=ir_value)),
                cst.QD_NAME:
                    Feature(float_list=FloatList(value=qd_value)),
                cst.REMAINING_CYCLES_NAME:
                    Feature(float_list=FloatList(value=rc_value)),
                cst.DISCHARGE_TIME_NAME:
                    Feature(float_list=FloatList(value=dt_value)),
                cst.QDLIN_NAME:
                    Feature(float_list=FloatList(value=qdlin_value)),
                cst.TDLIN_NAME:
                    Feature(float_list=FloatList(value=tdlin_value)),
                cst.CURRENT_CYCLE_NAME:
                    Feature(float_list=FloatList(value=cc_value))
            }
        )
    )
    return cycle_example


def write_to_tfrecords(batteries, data_dir, train_test_split=None):
    """
    Takes battery data in dict format as input and writes a set of tfrecords files to disk.

    To load the preprocessed battery data that was used to train the model, use the
    "load_processed_battery_data()" function and pass it as the batteries argument to the
    "Write_to_tfrecords()" function.

    A train/test split can be passed as a dictionary with the names of the splits (e.g. "train") as keys
    and lists of cell names (e.g. ["b1c3", "b1c4"]) as values. This will create subdirectories for each
    split.

    For more info on TFRecords and Examples see 'Hands-on Machine Learning with
    Scikit-Learn, Keras & TensorFlow', pp.416 (2nd edition, early release)
    """
    # Create base directory for tfrecords
    if not os.path.exists(data_dir):
        os.mkdir(data_dir)

    scaling_factors = calculate_and_save_scaling_factors(batteries, train_test_split, cst.SCALING_FACTORS_DIR)

    if train_test_split is None:
        # Write all cells into one directory
        for cell_name, cell_data in batteries.items():
            write_single_cell(cell_name, cell_data, data_dir, scaling_factors)
    else:
        # For each split set a new working directory in /Data/tfrecords
        # and write files there
        for split_name, split_indexes in train_test_split.items():
            split_data_dir = os.path.join(data_dir, split_name)
            # create directories
            if not os.path.exists(split_data_dir):
                os.mkdir(split_data_dir)
            split_batteries = {idx: batteries[idx] for idx in split_indexes}
            for cell_name, cell_data in split_batteries.items():
                write_single_cell(cell_name, cell_data, split_data_dir, scaling_factors)


def write_single_cell(cell_name, cell_data, data_dir, scaling_factors):
    """
    Takes data for one cell and writes it to a tfrecords file with the naming convention
    "b1c0.tfrecord". The SerializeToString() method creates binary data out of the
    Example objects that can be read natively in TensorFlow.
    """
    filename = os.path.join(data_dir, cell_name + ".tfrecord")
    with tf.io.TFRecordWriter(str(filename)) as f:
        for summary_idx, cycle_idx in enumerate(cell_data["cycles"].keys()):
            cycle_to_write = get_cycle_example(cell_data, summary_idx, cycle_idx, scaling_factors)
            f.write(cycle_to_write.SerializeToString())
    print("Created %s.tfrecords file." % cell_name)


def parse_features(example_proto):
    """
    The parse_features function takes an example and converts it from binary/message format
    into a more readable format. To be able to feed the dataset directly into a
    Tensorflow model later on, we split the data into examples and targets (i.e. X and y).

    The feature_description defines the schema/specifications to read from TFRecords.
    This could also be done by declaring feature columns and parsing the schema
    with tensorflow.feature_columns.make_parse_example_spec().
    """
    feature_description = {
        cst.INTERNAL_RESISTANCE_NAME: tf.io.FixedLenFeature([1, ], tf.float32),
        cst.QD_NAME: tf.io.FixedLenFeature([1, ], tf.float32),
        cst.DISCHARGE_TIME_NAME: tf.io.FixedLenFeature([1, ], tf.float32),
        cst.REMAINING_CYCLES_NAME: tf.io.FixedLenFeature([], tf.float32),
        cst.CURRENT_CYCLE_NAME: tf.io.FixedLenFeature([], tf.float32),
        cst.TDLIN_NAME: tf.io.FixedLenFeature([cst.STEPS, cst.INPUT_DIM], tf.float32),
        cst.QDLIN_NAME: tf.io.FixedLenFeature([cst.STEPS, cst.INPUT_DIM], tf.float32)
    }
    examples = tf.io.parse_single_example(example_proto, feature_description)

    target_remaining = examples.pop(cst.REMAINING_CYCLES_NAME)
    target_current = examples.pop(cst.CURRENT_CYCLE_NAME)
    targets = tf.stack([target_current, target_remaining], 0)

    return examples, targets


def get_flatten_windows(window_size):
    def flatten_windows(features, target):
        """
        Calling .window() on our dataset created a dataset of type "VariantDataset"
        for every feature in our main dataset. We need to flatten
        these VariantDatasets before we can feed everything to a model.
        Because the VariantDataset are modeled after windows, they have
        length=window_size.
        """
        # Select all rows for each feature
        qdlin = features[cst.QDLIN_NAME].batch(window_size)
        tdlin = features[cst.TDLIN_NAME].batch(window_size)
        ir = features[cst.INTERNAL_RESISTANCE_NAME].batch(window_size)
        dc_time = features[cst.DISCHARGE_TIME_NAME].batch(window_size)
        qd = features[cst.QD_NAME].batch(window_size)
        # the names in this dict have to match the names of the Input objects in
        # our final model
        features_flat = {
            cst.QDLIN_NAME: qdlin,
            cst.TDLIN_NAME: tdlin,
            cst.INTERNAL_RESISTANCE_NAME: ir,
            cst.DISCHARGE_TIME_NAME: dc_time,
            cst.QD_NAME: qd
        }
        # For every window we want to have one target/label
        # so we only get the last row by skipping all but one row
        target_flat = target.skip(window_size - 1)
        return tf.data.Dataset.zip((features_flat, target_flat))
    return flatten_windows


def get_create_cell_dataset_from_tfrecords(window_size, shift, stride, drop_remainder):
    def create_cell_dataset_from_tfrecords(file):
        """
        The read_tfrecords() function reads a file, skipping the first row which in our case
        is 0/NaN most of the time. It then loops over each example/row in the dataset and
        calls the parse_feature function. Then it batches the dataset, so it always feeds
        multiple examples at the same time, then shuffles the batches. It is important
        that we batch before shuffling, so the examples within the batches stay in order.
        """
        dataset = tf.data.TFRecordDataset(file)
        dataset = dataset.map(parse_features)
        dataset = dataset.window(size=window_size, shift=shift, stride=stride, drop_remainder=drop_remainder)
        dataset = dataset.flat_map(get_flatten_windows(window_size))
        return dataset
    return create_cell_dataset_from_tfrecords


def create_dataset(data_dir, window_size, shift, stride, batch_size,
                   cycle_length=4, num_parallel_calls=4,
                   drop_remainder=True, shuffle=True,
                   shuffle_buffer=500, repeat=True):
    """
    Creates a dataset from .tfrecord files in the data directory. Expects a regular expression
    to capture multiple files (e.g. "data/tfrecords/train/*tfrecord").
    The dataset will augment the original data by creating windows of loading cycles.

    To load unprocessed data, set "preprocessed" to False.

    Notes about the interleave() method:
    interleave() will create a dataset that pulls 4 (=cycle_length) file paths from the
    filepath_dataset and for each one calls the function "read_tfrecords()". It will then
    cycle through these 4 datasets, reading one line at a time from each until all datasets
    are out of items. Then it gets the next 4 file paths from the filepath_dataset and
    interleaves them the same way, and so on until it runs out of file paths.
    Even with parallel calls specified, data within batches is sequential.
    """
    filepath_dataset = tf.data.Dataset.list_files(data_dir)
    assembled_dataset = filepath_dataset.interleave(get_create_cell_dataset_from_tfrecords(window_size, shift, stride,
                                                                                           drop_remainder),
                                                    cycle_length=cycle_length,
                                                    num_parallel_calls=num_parallel_calls)
    if shuffle:
        assembled_dataset = assembled_dataset.shuffle(shuffle_buffer)

    # The batching has to happen after shuffling the windows, so one batch is not sequential
    assembled_dataset = assembled_dataset.batch(batch_size)

    if repeat:
        assembled_dataset = assembled_dataset.repeat()
    return assembled_dataset


def calculate_and_save_scaling_factors(data_dict, train_test_split, csv_dir):
    """Calculates the scaling factors for every feature based on the training set in train_test_split
    and saves the result in a csv file. The factors are used during writing of the tfrecords files."""

    print("Calculate scaling factors...")
    scaling_factors = dict()

    if train_test_split != None:
        # only take training cells
        data_dict = {k: v for k, v in data_dict.items() if k in train_test_split["train"]}
    else:
        # only take non-secondary-test cells
        data_dict = {k: v for k, v in data_dict.items() if k.startswith('b3')}


    # Calculating max values for summary features
    for k in [cst.INTERNAL_RESISTANCE_NAME,
              cst.QD_NAME,
              cst.REMAINING_CYCLES_NAME,  # The feature "Current_cycles" will be scaled by the same scaling factor
              cst.DISCHARGE_TIME_NAME]:
        # Two max() calls are needed, one for every cell, one over all cells
        scaling_factors[k] = max([max(cell_v["summary"][k])
                                  for cell_k, cell_v in data_dict.items()
                                  for cycle_v in cell_v["cycles"].values()])

    # Calculating max values for detail features
    for k in [cst.QDLIN_NAME,
              cst.TDLIN_NAME]:
        # Two max() calls are needed, one over every cycle array, one over all cycles (all cells included)
        scaling_factors[k] = max([max(cycle_v[k])
                                  for cell_k, cell_v in data_dict.items()
                                  for cycle_v in cell_v["cycles"].values()])

    with open(csv_dir, 'w', newline='') as file:
        writer = csv.DictWriter(file, fieldnames=scaling_factors.keys())
        writer.writeheader()  # Write the field names in the first line of the csv
        writer.writerow(scaling_factors)  # Write values to the corrent fields
    print("Saved scaling factors to {}".format(csv_dir))
    print("Scaling factors: {}".format(scaling_factors))
    return scaling_factors


def load_scaling_factors(csv_dir=cst.SCALING_FACTORS_DIR, gcloud_bucket=None):
    """Reads the scaling factors from a csv and returns them as a dict."""
    if gcloud_bucket:
        blob = gcloud_bucket.blob(csv_dir)
        names, values = blob.download_as_string().decode("utf-8").split("\r\n")[:2]  # Download and decode byte string.
        return {k: float(v) for k, v in zip(names.split(","), values.split(","))}
    else:
        with open(csv_dir, mode='r') as file:
            csv_reader = csv.DictReader(file)
            for row in csv_reader:
                return {k: float(v) for k, v in row.items()}  # Return only the first found line with numeric values


# dev method
def load_train_test_split():
    """
    Loads a train_test_split dict that divides all cell names into three lists,
    recreating the splits from the original paper.
    This can be passed directly to "write_to_tfrecords()" as an argument.
    """
    return pickle.load(open(cst.TRAIN_TEST_SPLIT, "rb"))


# dev method
def load_processed_battery_data():
    return pickle.load(open(cst.PROCESSED_DATA, "rb"))

##  启动训练数据生成

In [6]:
if __name__ == "__main__":
    print("Writing datasets with train/test split from original paper and preprocessed data.")
    print("Loading split...")
    split = load_train_test_split()
    print("Loading battery data...")
    battery_data = load_processed_battery_data()
    print("Start writing to disk...")
    write_to_tfrecords(battery_data, cst.DATASETS_DIR, train_test_split=split)
    print("Done.")

Writing datasets with train/test split from original paper and preprocessed data.
Loading split...
Loading battery data...
Start writing to disk...
Calculate scaling factors...
Saved scaling factors to data/tfrecords/scaling_factors.csv
Scaling factors: {'IR': 0.022039292, 'QD': 1.0979686, 'Remaining_cycles': 2159.0, 'Discharge_time': 14.758193333333232, 'Qdlin': 1.0828815, 'Tdlin': 41.78867160669522}
Created b1c1.tfrecords file.
Created b1c3.tfrecords file.
Created b1c5.tfrecords file.
Created b1c7.tfrecords file.
Created b1c11.tfrecords file.
Created b1c15.tfrecords file.
Created b1c17.tfrecords file.
Created b1c19.tfrecords file.
Created b1c21.tfrecords file.
Created b1c24.tfrecords file.
Created b1c26.tfrecords file.
Created b1c28.tfrecords file.
Created b1c30.tfrecords file.
Created b1c32.tfrecords file.
Created b1c34.tfrecords file.
Created b1c36.tfrecords file.
Created b1c38.tfrecords file.
Created b1c40.tfrecords file.
Created b1c42.tfrecords file.
Created b1c44.tfrecords file.