## Imports

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline

## Funções

In [None]:
def load_and_concat_data_from_csv(data: list) -> pd.DataFrame:
    """Load multiple CSV files, remove unnecessary rows, and concatenate them into a single DataFrame.

    This function reads multiple CSV files into pandas DataFrames, removes rows where
    the column "resource" has the value "lon", and then concatenates all the DataFrames
    into a single one.

    Args:
        data (list): A list of file paths (str) pointing to CSV files.

    Returns:
        pd.DataFrame: A concatenated DataFrame containing the cleaned data from all input files.

    Raises:
        FileNotFoundError: If any of the specified CSV files cannot be found.
        pd.errors.EmptyDataError: If a CSV file is empty.
        KeyError: If the column "resource" is not present in one of the CSV files."""
    dataframes = []
    for file in data:
        df = pd.read_csv(file)

        ## Esses dados "lon" são inuteis e não deveriam estar no dataset
        df = df[df["resource"] != "lon"]

        dataframes.append(df)
    
    return pd.concat(dataframes)

In [None]:
df_itu_415_csv_locations = ["../../data/full_history_ITU-415_2025-06-01_a_2025-06-17.csv", "../../data/full_history_ITU-415_2025-06-17_a_2025-06-29.csv",
                            "../../data/full_history_ITU-415_2025-06-29_a_2025-06-31.csv", "../../data/full_history_ITU-415_2025-07-01_a_2025-07-17.csv"]

df_itu_693_csv_locations = ["../../data/full_history_ITU-693_2025-05-01_a_2025-06-08.csv", "../../data/full_history_ITU-693_2025-06-08_a_2025-07-22.csv"]

## Funções ajustadas para Pipeline

In [None]:
class AdjustTimestampColumn(BaseEstimator, TransformerMixin):
    """Transformer that converts the 'timestamp' column to datetime format.

    This transformer is compatible with scikit-learn pipelines. It ensures that the
    'timestamp' column in the input DataFrame is converted to a pandas datetime object
    using the ISO8601 format.

    Methods:
        fit(X, y=None):
            Does nothing and returns self. Required for compatibility with
            scikit-learn pipelines.
        transform(X):
            Returns a copy of the DataFrame with the 'timestamp' column converted
            to datetime."""
    def fit(self, X: pd.DataFrame, y=None):
        return self
    
    def transform(self, X: pd.DataFrame):
        df = X.copy()
        df["timestamp"] = pd.to_datetime(df["timestamp"], format="ISO8601")
        return df


In [None]:
class RemoveDuplicatesAndNaN(BaseEstimator, TransformerMixin):
    """Transformer that fills missing values and removes duplicate rows.

    This transformer is designed to be used within scikit-learn pipelines.
    It fills missing values in each column with the column's mode (most frequent value)
    and then removes any duplicate rows from the DataFrame.

    Methods:
        fit(X, y=None):
            Returns self. Included for compatibility with scikit-learn pipelines.
        transform(X):
            Returns a cleaned DataFrame with missing values filled and duplicates removed."""
    def fit(self, X: pd.DataFrame, y=None):
        return self
    
    def transform(self, X: pd.DataFrame):
        df = X.copy()
        df_filled = df.apply(lambda col: col.fillna(col.mode().iloc[0]) if col.isnull().any() else col)
        df_cleaned = df_filled.drop_duplicates()
        return df_cleaned

In [None]:
class TreatHighValues(BaseEstimator, TransformerMixin):
    """Transformer that caps high values and creates a running status flag.

    This transformer is intended for use in scikit-learn pipelines. It checks the
    column value in the input DataFrame and applies the following rules:
    
    - If value exceeds max_limit, it is replaced with 0.
    - A new column running is created, set to 1 when value is within the limit
      and 0 otherwise.

    Args:
        max_limit (int, optional): Maximum allowed value for the value column.
            Values above this threshold are set to 0. Defaults to 20000.

    Methods:
        fit(X, y=None):
            Returns self. Required for scikit-learn pipeline compatibility.
        transform(X):
            Returns a DataFrame with capped values and a new running column."""
    def __init__(self, max_limit: int = 20000):
        self.max_limit = max_limit

    def fit(self, X: pd.DataFrame, y=None):
        return self

    def transform(self, X:pd.DataFrame):
        df = X.copy()
        df['running'] = np.where(df['value'] > self.max_limit, 0, 1)
        df['value'] = np.where(df['value'] > self.max_limit, 0, df['value'])
        return df

In [None]:
class FixBatteryAndAlternatorValues(BaseEstimator, TransformerMixin):
    """Transformer that adjusts battery and alternator voltage values.

    This transformer is compatible with scikit-learn pipelines. It modifies the
    `value` column for specific resources:
    
    - For rows where `resource` is `"Bat_V"`, the `value` is divided by 10.
    - For rows where `resource` is `"Char_V"`, the `value` is divided by 10.

    Methods:
        fit(X, y=None):
            Returns self. Included for compatibility with scikit-learn pipelines.
        transform(X):
            Returns a DataFrame with adjusted values for `"Bat_V"` and `"Char_V"`."""
    def fit(self, X: pd.DataFrame, y=None):
        return self

    def transform(self, X:pd.DataFrame):
        df = X.copy()
        df.loc[df["resource"] == "Bat_V", "value"] = df.loc[df["resource"] == "Bat_V", "value"] / 10
        df.loc[df["resource"] == "Char_V", "value"] = df.loc[df["resource"] == "Char_V", "value"] / 10
        return df

In [None]:
class PivotDataframe(BaseEstimator, TransformerMixin):
    """Transformer that pivots, resamples, and cleans time-series data for each motor pump.

    This transformer is designed for use in scikit-learn pipelines. It performs the following steps:
    
    1. Checks that required columns ("timestamp", "motor_pump", "resource", "value", "running") are present.
    2. Pivots the DataFrame from long to wide format using "resource" as columns and "value" as values.
    3. Merges the "running" column back into the wide DataFrame.
    4. Sets "timestamp" as the index.
    5. Resamples the data for each "motor_pump" at a fixed interval (`resample_seconds`), filling missing values with forward fill.
    6. Rounds the "running" column to integer and ensures proper sorting and deduplication.

    Args:
        resample_seconds (int, optional): The interval in seconds for resampling the time-series data. Defaults to 60.

    Methods:
        fit(X, y=None):
            Returns self. Required for compatibility with scikit-learn pipelines.
        transform(X):
            Returns a cleaned, pivoted, and resampled DataFrame.
            
    Raises:
        ValueError: If any of the required columns are missing from the input DataFrame."""
    def __init__(self, resample_seconds: int = 60):
        self.resample_seconds = resample_seconds

    def fit(self, X: pd.DataFrame, y=None):
        return self

    def transform(self, X:pd.DataFrame):
        df = X.copy()

        required_cols = {"timestamp", "motor_pump", "resource", "value", "running"}
        missing = required_cols - set(df.columns)
        if missing:
            raise ValueError(f"PivotDataframe: missing columns: {missing}")


        df_wide = (
        df.pivot_table(
            index=["timestamp", "motor_pump"],
            columns="resource",
            values="value",
            aggfunc="mean"
        )
        .reset_index()
        )

        df_running = df[["timestamp", "motor_pump", "running"]]
        df_wide = df_wide.merge(df_running, on=["timestamp", "motor_pump"], how="left")
        df_wide = df_wide.set_index("timestamp")

        resampled = []
        for pump_id, group in df_wide.groupby("motor_pump"):
            g = (
                group
                .resample(f"{self.resample_seconds}s")
                .mean(numeric_only=True)
                .ffill()
            )
            g["running"] = g["running"].round().astype(int)
            g["motor_pump"] = pump_id
            resampled.append(g)

        df_wide = pd.concat(resampled).reset_index()
        df_wide = df_wide.sort_values(["motor_pump", "timestamp"]).reset_index(drop=True)
        return df_wide.drop_duplicates().ffill()
        
        

In [None]:
class RemoveZeroColumns(BaseEstimator, TransformerMixin):
    """Transformer that removes columns containing only zeros.

    This transformer is designed to be used in scikit-learn pipelines. It identifies
    columns where all values are zero during fitting and removes them during transformation.

    Methods:
        fit(X, y=None):
            Identifies columns that contain only zeros and stores the ones to keep.
        transform(X):
            Returns a DataFrame with zero-only columns removed."""

    def fit(self, X: pd.DataFrame, y=None):
        zero_columns = (X == 0).all()
        self.columns_to_keep_ = zero_columns[~zero_columns].index
        return self

    def transform(self, X: pd.DataFrame):
        return X[self.columns_to_keep_]

In [None]:
pipeline = Pipeline(steps=[
    ("adjust_timestamp", AdjustTimestampColumn()),
    ("remove_duplicates_and_nan", RemoveDuplicatesAndNaN()),
    ("treat_high_values", TreatHighValues()),
    ("fix_battery_and_alternator_values", FixBatteryAndAlternatorValues()),
    ("pivot_dataframe", PivotDataframe(resample_seconds=15)),
    ("remove_zero_columns", RemoveZeroColumns())
])

df_raw = load_and_concat_data_from_csv(df_itu_415_csv_locations)
df_processed = pipeline.fit_transform(df_raw)

In [None]:
df_processed.head()