# Notebook per regressione lineare

Di seguito:
- verrà implementato l'algoritmo di regressione logistica

## Scaricamento dei dati

In [1]:
# File per scaricare i dati per fare analisi di machine learning

import requests
import zipfile
import io
import os

# List of URLs to your zipped files on AWS
urls = [
    "https://phm-datasets.s3.amazonaws.com/Data_Challenge_PHM2024_training_data.zip"]

# Directory to save the extracted files
output_dir = "dataset"
os.makedirs(output_dir, exist_ok=True)

for url in urls:
    try:
        print(f"Downloading {url}...")
        response = requests.get(url, stream=True)
        response.raise_for_status() # Raise an exception for bad status codes

        # Read the zip file from the response content
        with zipfile.ZipFile(io.BytesIO(response.content)) as zip_ref:
            # Extract all contents to the specified output directory
            zip_ref.extractall(output_dir)
            print(f"Extracted files from {url} to {output_dir}")

    except requests.exceptions.RequestException as e:
        print(f"Error downloading {url}: {e}")
    except zipfile.BadZipFile:
        print(f"Error: The downloaded file from {url} is not a valid zip file.")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")

print("Download and extraction complete.")

# Now you can access your CSV files in the 'downloaded_data' directory
# For example, to list the files in the directory:
import glob
csv_files = glob.glob(os.path.join(output_dir, "*.csv"))
print("CSV files found:", csv_files)

Downloading https://phm-datasets.s3.amazonaws.com/Data_Challenge_PHM2024_training_data.zip...
Extracted files from https://phm-datasets.s3.amazonaws.com/Data_Challenge_PHM2024_training_data.zip to dataset
Download and extraction complete.
CSV files found: ['dataset/X_train.csv', 'dataset/y_train.csv']


## Estrazione dei dati

In [2]:
import pandas as pd

def load_data(x_path, y_path):
  """
  Loads X.csv and the second column of y.csv into a single pandas DataFrame.

  Args:
    x_path (str): The path to the X.csv file.
    y_path (str): The path to the y.csv file.

  Returns:
    pandas.DataFrame: A DataFrame containing the data from X.csv
                      and the second column of y.csv.
  """
  x = pd.read_csv(x_path)
  y = pd.read_csv(y_path)

  # Assuming y has at least 2 columns and the second column is at index 1
  if y.shape[1] > 1:
    combined_data = x.copy()
    combined_data['y_target'] = y.iloc[:, 1]
    return combined_data
  else:
    print("Error: y.csv does not have a second column.")
    return x

# Example usage:
# Assuming your files are in the 'dataset' directory as per the preceding code
x_path = 'dataset/X_train.csv'
y_path = 'dataset/y_train.csv'

data = load_data(x_path, y_path)

# You can now work with the 'data' DataFrame
print(data.head())


   id  trq_measured       oat       mgt         pa       ias         np  \
0   0        54.100   2.00000  544.5000   212.1408  74.56250   89.18000   
1   1        49.625  24.22231  578.4844  1625.6400  30.35596   99.55273   
2   2        52.000   7.00000  566.1000  1912.9250  65.62500  100.14000   
3   3        62.400   7.25000  560.1000   277.0632  54.81250   90.64000   
4   4        62.900  23.25000  593.7000    53.6448  73.43750   99.91000   

         ng  y_target  
0   99.6400         1  
1   91.3866         0  
2   90.9600         1  
3  100.2800         0  
4   92.1700         0  


## Creazione training-set testing-set

In [3]:
from sklearn.model_selection import train_test_split

data_train, data_test = train_test_split(data, test_size=0.2, random_state=42)

print("Dimensione data_train:", data_train.shape)
print("Dimensione data_test:", data_test.shape)

Dimensione data_train: (594100, 9)
Dimensione data_test: (148525, 9)


# PIPELINE

## Cambio nome delle feature

In [5]:
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline

class FeatureRenamer(BaseEstimator, TransformerMixin):
    def __init__(self, column_names):
        if not isinstance(column_names, list):
            raise TypeError("column_names must be a list.")
        self.column_names = column_names

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        is_tuple = isinstance(X, tuple)
        if is_tuple:
            if not X:
                raise ValueError("Input tuple is empty.")
            df = X[0]
            rest = X[1:]
        else:
            df = X
            rest = ()

        if not isinstance(df, pd.DataFrame):
            raise TypeError("Input (or first element if tuple) must be a DataFrame.")

        # Drop 'id' column if present
        if 'id' in df.columns:
            df = df.drop(columns='id')

        # Check column count matches
        if len(self.column_names) != df.shape[1]:
            if len(self.column_names) + 1 == df.shape[1] and 'faulty' in df.columns:
                print("Detected 'faulty' column, adjusting rename.")
                new_cols = self.column_names + ['faulty']
                if len(new_cols) != df.shape[1]:
                    raise ValueError("Mismatch in column count with 'faulty' included.")
                df_renamed = df.copy()
                df_renamed.columns = new_cols
            else:
                raise ValueError("Column count mismatch after dropping 'id'.")
        else:
            df_renamed = df.copy()
            df_renamed.columns = self.column_names

        return (df_renamed,) + rest if rest else df_renamed

# ---------------------- Setup ----------------------
new_names_for_train_data = [
    'torque_meas', 'outside_air_temp', 'mean_gas_temp',
    'power_avail', 'indicated_air_speed', 'net_power',
    'compressor_speed', 'health_state'
]

# ---------------------- Pipeline ----------------------
pipeline_with_renaming = Pipeline([
    ('rename_features', FeatureRenamer(column_names=new_names_for_train_data))
])

# Apply pipeline
data_train_renamed = pipeline_with_renaming.fit_transform(data_train)

# ---------------------- Output ----------------------
print("Renamed DataFrame (without 'id'):")
print(data_train_renamed.head())

Renamed DataFrame (without 'id'):
        torque_meas  outside_air_temp  mean_gas_temp  power_avail  \
331067      74.8000            19.500       646.1000    1005.8400   
601458      67.0332            20.152       602.4063     958.4962   
77053       67.8000             8.000       546.7000      24.3840   
664037      53.9000             5.500       532.1000     360.8832   
346977      73.3000            17.750       636.3000    1025.9570   

        indicated_air_speed  net_power  compressor_speed  health_state  
331067             56.00000  100.19000          96.47000             1  
601458             96.97656   99.65235          94.02332             0  
77053              73.68750  100.22000          91.61000             0  
664037             61.87500   88.91000          99.78000             0  
346977             89.50000   99.98000          95.95000             1  


## Standardizzazione

In [6]:
from sklearn.preprocessing import MinMaxScaler

def standardize_columns(df, columns_to_standardize):
  """
  Standardizes specified columns of a pandas DataFrame to have values between 0 and 1
  using MinMaxScaler.

  Args:
    df: The pandas DataFrame to standardize.
    columns_to_standardize: A list of column names to standardize.

  Returns:
    The DataFrame with the specified columns standardized.
  """
  scaler = MinMaxScaler()
  df[columns_to_standardize] = scaler.fit_transform(df[columns_to_standardize])
  return df

# Example usage:
# Assuming you want to standardize all numerical columns except the index and the target variable
# Identify numerical columns (excluding 'idx' and 'health_state' in this case)
numerical_cols = data_train.select_dtypes(include=['float64', 'int64']).columns.tolist()
columns_to_standardize = [col for col in numerical_cols if col not in ['idx', 'health_state']]

print(f"\nColumns to standardize: {columns_to_standardize}")

data_train_standardized = standardize_columns(data_train.copy(), columns_to_standardize)

print("\nDataFrame after standardization:")
print(data_train_standardized.head())
print("\nDescriptive statistics after standardization:")
print(data_train_standardized.describe())


Columns to standardize: ['id', 'trq_measured', 'oat', 'mgt', 'pa', 'ias', 'np', 'ng', 'y_target']

DataFrame after standardization:
              id  trq_measured       oat       mgt        pa       ias  \
331067  0.445807      0.667650  0.691358  0.698165  0.367293  0.407088   
601458  0.809909      0.581407  0.702990  0.527553  0.352972  0.704964   
77053   0.103758      0.589922  0.486181  0.310035  0.070412  0.535666   
664037  0.894177      0.435575  0.441577  0.253026  0.172200  0.449796   
346977  0.467231      0.650994  0.660135  0.659898  0.373378  0.650613   

              np        ng  y_target  
331067  0.930147  0.576580       1.0  
601458  0.897203  0.358480       0.0  
77053   0.931985  0.143354       0.0  
664037  0.238971  0.871637       0.0  
346977  0.917279  0.530226       1.0  

Descriptive statistics after standardization:
                  id   trq_measured            oat            mgt  \
count  594100.000000  594100.000000  594100.000000  594100.000000   
mea

In [7]:
# prompt: crea una classe che sia compatibile con l'oggetto pipeline di scikit che contenga la funzione transform_with_custom_root standardize_columns

class ColumnStandardizer(BaseEstimator, TransformerMixin):
    def __init__(self, columns_to_standardize=None):
        self.columns_to_standardize = columns_to_standardize
        self.scaler = MinMaxScaler()

    def fit(self, X, y=None):
        if not isinstance(X, pd.DataFrame):
            raise TypeError("Input must be a DataFrame.")

        if self.columns_to_standardize is None:
            # Identify numerical columns excluding the target if present
            numerical_cols = X.select_dtypes(include=['float64', 'int64']).columns.tolist()
            # Assuming 'y_target' or similar is the target and should not be scaled
            # This part might need adjustment based on how your pipeline handles the target column
            self._cols_to_fit = [col for col in numerical_cols if col not in ['y_target', 'health_state']] # Exclude common target/index names
        else:
             if not isinstance(self.columns_to_standardize, (list, tuple)):
                 raise TypeError("columns_to_standardize must be a list or tuple of column names.")
             # Ensure all specified columns exist in the input DataFrame
             missing_cols = [col for col in self.columns_to_standardize if col not in X.columns]
             if missing_cols:
                 raise ValueError(f"Columns not found in input DataFrame: {missing_cols}")
             self._cols_to_fit = self.columns_to_standardize

        if not self._cols_to_fit:
            print("Warning: No columns selected for standardization.")
            return self

        # Fit the scaler only on the selected numerical columns
        self.scaler.fit(X[self._cols_to_fit])
        return self

    def transform(self, X):
        if not isinstance(X, pd.DataFrame):
            raise TypeError("Input must be a DataFrame.")
        if not hasattr(self, '_cols_to_fit'):
             raise RuntimeError("The transformer has not been fitted yet.")

        df_standardized = X.copy()

        if not self._cols_to_fit:
             return df_standardized # Return original if no columns to standardize

        # Ensure that the columns to transform are actually present in the input DataFrame
        present_cols_to_transform = [col for col in self._cols_to_fit if col in df_standardized.columns]

        if not present_cols_to_transform:
             print("Warning: None of the specified columns for standardization are present in the input DataFrame.")
             return df_standardized

        # Transform only the present columns
        df_standardized[present_cols_to_transform] = self.scaler.transform(df_standardized[present_cols_to_transform])

        return df_standardized


## Dropping columns

In [8]:
# prompt: crea una trasformazione che droppi certe colonne di un dataset

class ColumnDropper(BaseEstimator, TransformerMixin):
    def __init__(self, columns_to_drop):
        if not isinstance(columns_to_drop, (list, tuple)):
            raise TypeError("columns_to_drop must be a list or tuple of column names.")
        self.columns_to_drop = columns_to_drop

    def fit(self, X, y=None):
        # Nothing to fit, but check if columns exist in the input
        if not isinstance(X, pd.DataFrame):
            raise TypeError("Input must be a DataFrame.")
        missing_cols = [col for col in self.columns_to_drop if col not in X.columns]
        if missing_cols:
            print(f"Warning: Columns to drop not found in input DataFrame: {missing_cols}. These will be ignored during transform.")
            # Store the columns that are actually present to avoid errors during transform
            self._cols_to_drop_present = [col for col in self.columns_to_drop if col in X.columns]
        else:
            self._cols_to_drop_present = self.columns_to_drop
        return self

    def transform(self, X):
        if not isinstance(X, pd.DataFrame):
            raise TypeError("Input must be a DataFrame.")
        if not hasattr(self, '_cols_to_drop_present'):
             raise RuntimeError("The transformer has not been fitted yet.")

        df_dropped = X.copy()
        # Drop only the columns that were found during fit
        df_dropped = df_dropped.drop(columns=self._cols_to_drop_present, errors='ignore')
        return df_dropped

## Radice custom

### Trasformazione in work_with_data

In [9]:
def transform_with_custom_root(df, column_name, root_degree):
  """
  Applies a custom root transformation (1/root_degree power) to a column.
  Handles positive, negative, and zero values appropriately based on the root degree.

  Args:
    df (pd.DataFrame): The input DataFrame.
    column_name (str): The name of the column to transform.
    root_degree (float): The degree of the root (e.g., 2 for square root, 3 for cube root).

  Returns:
    pd.DataFrame: The DataFrame with the transformed column.
  """
  new_column_name = f'{column_name}_root_{root_degree:.2f}_transformed'

  if root_degree == 0:
      raise ValueError("Root degree cannot be zero.")
  elif root_degree % 2 == 0:  # Even root
      # For even roots, we can only take the root of non-negative numbers
      if (df[column_name] < 0).any():
          print(f"Warning: Column '{column_name}' contains negative values. Cannot apply even root directly.")
          # You might choose to handle this by taking the root of the absolute value,
          # or setting negative values to NaN, depending on your data context.
          # Here, we'll take the root of the absolute value for demonstration.
          df[new_column_name] = np.power(np.abs(df[column_name]), 1/root_degree)
      else:
          df[new_column_name] = np.power(df[column_name], 1/root_degree)
  else:  # Odd root
      # Odd roots can handle positive, negative, and zero values
      df[new_column_name] = np.sign(df[column_name]) * np.power(np.abs(df[column_name]), 1/root_degree)

  return df

### Trasformazione pipeline

In [10]:
# prompt: crea una classe che sia compatibile con l'oggetto pipeline di scikit che contenga la funzione transform_with_custom_root

class CustomRootTransformer(BaseEstimator, TransformerMixin):
    """
    A custom transformer to apply a custom root transformation to specified columns.
    Compatible with scikit-learn pipelines.

    Args:
        root_transformations (dict): A dictionary where keys are column names
                                     and values are the root degrees to apply.
    """
    def __init__(self, root_transformations=None):
        self.root_transformations = root_transformations

    def fit(self, X, y=None):
        """
        Fits the transformer. In this case, there's nothing to fit.

        Args:
            X: Input data (ignored).
            y: Target data (ignored).

        Returns:
            self: The fitted transformer instance.
        """
        return self

    def transform(self, X):
        """
        Applies the custom root transformation to the specified columns in the DataFrame.

        Args:
            X (pd.DataFrame): The input DataFrame.

        Returns:
            pd.DataFrame: The DataFrame with the transformed columns.
        """
        if not isinstance(X, pd.DataFrame):
            raise TypeError("Input X must be a pandas DataFrame.")

        df_transformed = X.copy() # Work on a copy to avoid modifying the original DataFrame

        if self.root_transformations:
            for col, deg in self.root_transformations.items():
                if col not in df_transformed.columns:
                    print(f"Warning: Column '{col}' not found in DataFrame. Skipping root transformation.")
                    continue

                print(f"Applying custom root {deg} transformation to column '{col}'...")
                df_transformed = transform_with_custom_root(df_transformed, col, deg)

        return df_transformed



## Binary transformation

### Trasformazione work_with_data

In [11]:
def create_binned_qualitative_variable(df, column_name, num_bins, strategy='quantile'):
  """
  Creates a qualitative (categorical) variable by binning a numerical column.

  Args:
    df (pd.DataFrame): The input DataFrame.
    column_name (str): The name of the numerical column to bin.
    num_bins (int): The desired number of bins.
    strategy (str): The strategy to use for binning. 'quantile' uses quantiles
                    to ensure bins have approximately equal numbers of observations.
                    'uniform' creates bins with equal widths. Default is 'quantile'.

  Returns:
    pd.DataFrame: The DataFrame with a new qualitative column.
                  The new column name will be f'{column_name}_binned_{num_bins}_{strategy}'.
  """
  if column_name not in df.columns:
    raise ValueError(f"La colonna '{column_name}' non è presente nel DataFrame.")
  if num_bins <= 1:
      raise ValueError("Il numero di bins deve essere maggiore di 1.")

  new_column_name = f'{column_name}_binned_{num_bins}_{strategy}'

  if strategy == 'quantile':
    # Use qcut to create bins based on quantiles (approximately equal number of observations)
    # `duplicates='drop'` handles cases where quantile boundaries are not unique,
    # which can happen with skewed or discrete data.
    df[new_column_name] = pd.qcut(df[column_name], q=num_bins, labels=False, duplicates='drop')
  elif strategy == 'uniform':
    # Use cut to create bins of equal width
    df[new_column_name] = pd.cut(df[column_name], bins=num_bins, labels=False, include_lowest=True)
  else:
    raise ValueError(f"Strategia di binning non valida: '{strategy}'. Scegliere tra 'quantile' o 'uniform'.")

  # Convert the binned column to object/category type if needed, or keep as int for simplicity
  # Here we keep it as int representing the bin number

  return df

### Trasformazione pipeline

In [12]:
# prompt: crea una classe che sia compatibile con l'oggetto pipeline di scikit che contenga la funzione  create_binned_qualitative_variable

class BinnedQualitativeTransformer(BaseEstimator, TransformerMixin):
    """
    A custom transformer to create binned qualitative (categorical) variables
    from numerical columns. Compatible with scikit-learn pipelines.

    Args:
        binning_config (dict): A dictionary where keys are column names
                               and values are tuples (num_bins, strategy).
                               Strategy can be 'quantile' or 'uniform'.
    """
    def __init__(self, binning_config=None):
        self.binning_config = binning_config

    def fit(self, X, y=None):
        """
        Fits the transformer. In this case, there's nothing to fit.

        Args:
            X: Input data (ignored).
            y: Target data (ignored).

        Returns:
            self: The fitted transformer instance.
        """
        return self

    def transform(self, X):
        """
        Applies the binning transformation to the specified columns in the DataFrame.

        Args:
            X (pd.DataFrame): The input DataFrame.

        Returns:
            pd.DataFrame: The DataFrame with the new binned qualitative columns.
        """
        if not isinstance(X, pd.DataFrame):
            raise TypeError("Input X must be a pandas DataFrame.")

        df_transformed = X.copy() # Work on a copy to avoid modifying the original DataFrame

        if self.binning_config:
            for col, (num_bins, strategy) in self.binning_config.items():
                if col not in df_transformed.columns:
                    print(f"Warning: Column '{col}' not found in DataFrame. Skipping binning transformation.")
                    continue

                print(f"Applying binning transformation to column '{col}' with {num_bins} bins and strategy '{strategy}'...")
                df_transformed = create_binned_qualitative_variable(df_transformed, col, num_bins, strategy)

        return df_transformed


## Compressor speed net power

### Trasformazione con work_with_data

In [13]:
# Select the columns for PCA
features_for_pca = data_train[['compressor_speed', 'net_power']]

# Initialize PCA with 1 component (to combine the two variables)
pca = PCA(n_components=1)

# Fit PCA on the selected features and transform them
data_train['compressor_speed_net_power_pca'] = pca.fit_transform(features_for_pca)

KeyError: "None of [Index(['compressor_speed', 'net_power'], dtype='object')] are in the [columns]"

### Trasformazione pipeline

In [14]:
# prompt: crea una classe che sia compatibile con l'oggetto pipeline di scikit che  come nella cella sopra a partire dalle feature 'compressor_speed' e net_power' tramite il metodo PCA crei una nuova variabile 'compressor_speed_net_power_pca', droppa poi le feature 'net_power' e 'compressor_speed'

class PCATransformer(BaseEstimator, TransformerMixin):
    """
    A custom transformer to apply PCA on specified columns and drop the original ones.

    Args:
        columns_to_pca (list): A list of column names to apply PCA to.
        n_components (int): The number of principal components to keep.
        new_column_prefix (str): Prefix for the new PCA column name.
    """
    def __init__(self, columns_to_pca, n_components=1, new_column_prefix="pca"):
        if not isinstance(columns_to_pca, list) or len(columns_to_pca) < 1:
            raise ValueError("columns_to_pca must be a list of at least one column name.")
        if n_components < 1 or n_components > len(columns_to_pca):
            raise ValueError("n_components must be between 1 and the number of columns to PCA.")

        self.columns_to_pca = columns_to_pca
        self.n_components = n_components
        self.new_column_prefix = new_column_prefix
        self.pca_model = None
        self.new_column_names = []


    def fit(self, X, y=None):
        """
        Fits the PCA model on the specified columns.

        Args:
            X (pd.DataFrame): The input DataFrame.
            y: Target data (ignored).

        Returns:
            self: The fitted transformer instance.
        """
        if not isinstance(X, pd.DataFrame):
            raise TypeError("Input X must be a pandas DataFrame.")

        # Check if all specified columns are in the DataFrame
        missing_cols = [col for col in self.columns_to_pca if col not in X.columns]
        if missing_cols:
            raise ValueError(f"The following columns for PCA were not found in the DataFrame: {missing_cols}")

        print(f"Fitting PCA on columns: {self.columns_to_pca}")
        self.pca_model = PCA(n_components=self.n_components)
        self.pca_model.fit(X[self.columns_to_pca])

        # Define the new column names based on the prefix and component number
        if self.n_components == 1:
            self.new_column_names = [f'{self.new_column_prefix}_{"_".join(self.columns_to_pca).replace(".", "_")}']
        else:
             self.new_column_names = [f'{self.new_column_prefix}_{i+1}' for i in range(self.n_components)]


        return self

    def transform(self, X):
        """
        Applies the fitted PCA transformation to the specified columns,
        creates the new PCA column(s), and drops the original columns.

        Args:
            X (pd.DataFrame): The input DataFrame.

        Returns:
            pd.DataFrame: The DataFrame with the new PCA column(s) and original columns dropped.
        """
        if not isinstance(X, pd.DataFrame):
            raise TypeError("Input X must be a pandas DataFrame.")

        if self.pca_model is None:
            raise RuntimeError("PCA model has not been fitted yet. Call fit() first.")

        df_transformed = X.copy() # Work on a copy

        # Check if all specified columns are still in the DataFrame before dropping
        cols_to_drop_exist = [col for col in self.columns_to_pca if col in df_transformed.columns]

        # Apply the transformation
        print(f"Transforming columns {self.columns_to_pca} using PCA...")
        pca_result = self.pca_model.transform(df_transformed[self.columns_to_pca])

        # Create new column(s)
        if self.n_components == 1:
             df_transformed[self.new_column_names[0]] = pca_result[:, 0]
        else:
            for i, col_name in enumerate(self.new_column_names):
                df_transformed[col_name] = pca_result[:, i]


        # Drop original columns
        print(f"Dropping original PCA columns: {cols_to_drop_exist}")
        df_transformed = df_transformed.drop(columns=cols_to_drop_exist)

        return df_transformed

# Example usage:
# Assuming 'data_train' is your DataFrame
# pca_transformer = PCATransformer(columns_to_pca=['compressor_speed', 'net_power'], n_components=1, new_column_prefix='compressor_speed_net_power')
# data_train_pca = pca_transformer.fit_transform(data_train.copy())
# print(data_train_pca[['compressor_speed_net_power_pca', 'compressor_speed', 'net_power']].head()) # Check the new column and if originals are dropped

## Trasformazione torque_times_temp

### Trasformazione work_with_data

In [15]:
data_train['torque_times_temp'] = data_train['torque_meas'] * data_train['outside_air_temp']

KeyError: 'torque_meas'

### Trasformazione pipeline

In [16]:
# prompt: crea una classe che sia compatibile con l'oggetto pipeline di scikit che,  come nella cella sopra ,faccia il prodotto dalle feature 'torque_meas' e outside_air_temp' , e crei  una nuova feature 'torque_times_temp', droppa poi le feature ''torque_meas' e 'outside_air_temp

class TorqueTempFeature(BaseEstimator, TransformerMixin):
    """
    A custom transformer to create the 'torque_times_temp' feature
    by multiplying 'torque_meas' and 'outside_air_temp', and then
    dropping the original columns. Compatible with scikit-learn pipelines.
    """
    def __init__(self):
        pass # No parameters needed for this specific transformation

    def fit(self, X, y=None):
        """
        Fits the transformer. In this case, it's a no-op as there's nothing to fit.

        Args:
            X: Input data (ignored).
            y: Target data (ignored).

        Returns:
            self: The fitted transformer instance.
        """
        # Check if the required columns exist during fit
        required_cols = ['torque_meas', 'outside_air_temp']
        missing_cols = [col for col in required_cols if col not in X.columns]
        if missing_cols:
            raise ValueError(f"The following required columns for TorqueTempFeature were not found in the DataFrame: {missing_cols}")

        return self

    def transform(self, X):
        """
        Creates the 'torque_times_temp' feature and drops the original columns.

        Args:
            X (pd.DataFrame): The input DataFrame.

        Returns:
            pd.DataFrame: The DataFrame with the new feature and original columns dropped.
        """
        if not isinstance(X, pd.DataFrame):
            raise TypeError("Input X must be a pandas DataFrame.")

        df_transformed = X.copy() # Work on a copy

        required_cols = ['torque_meas', 'outside_air_temp']
        missing_cols = [col for col in required_cols if col not in df_transformed.columns]

        if missing_cols:
             print(f"Warning: Skipping 'torque_times_temp' creation as columns are missing: {missing_cols}")
             # If columns are missing, just return the dataframe without modification
             return df_transformed
        else:
            print("Creating 'torque_times_temp' feature...")
            df_transformed['torque_times_temp'] = df_transformed['torque_meas'] * df_transformed['outside_air_temp']
            print("Dropping 'torque_meas' and 'outside_air_temp' columns...")
            df_transformed = df_transformed.drop(columns=['torque_meas', 'outside_air_temp'])

        return df_transformed


## Tolgo health_state

In [18]:
# prompt: estrai dal dataframe 'data_train' 'helath state' e salvalo in data_train_y e droppalo in data_train

data_train_y = data_train['y_target']
data_train_x = data_train.drop('y_target', axis=1)

print("data_train after dropping 'y_target':")
print(data_train.head())
print("\ndata_train_y (extracted 'y_target'):")
print(data_train_y.head())


data_train after dropping 'y_target':
            id  trq_measured     oat       mgt         pa       ias  \
331067  331067       74.8000  19.500  646.1000  1005.8400  56.00000   
601458  601458       67.0332  20.152  602.4063   958.4962  96.97656   
77053    77053       67.8000   8.000  546.7000    24.3840  73.68750   
664037  664037       53.9000   5.500  532.1000   360.8832  61.87500   
346977  346977       73.3000  17.750  636.3000  1025.9570  89.50000   

               np        ng  y_target  
331067  100.19000  96.47000         1  
601458   99.65235  94.02332         0  
77053   100.22000  91.61000         0  
664037   88.91000  99.78000         0  
346977   99.98000  95.95000         1  

data_train_y (extracted 'y_target'):
331067    1
601458    0
77053     0
664037    0
346977    1
Name: y_target, dtype: int64


## Pipeline con featurerenamer

In [26]:



preprocessing = Pipeline([
    ('rename_features', FeatureRenamer(column_names=new_names_for_train_data[:-1])), # Exclude 'health_state' if not in data_train_x
    ('custom_root_power_avail', CustomRootTransformer(root_transformations={'power_avail': 2.35})),
    ('binned_air_speed', BinnedQualitativeTransformer(binning_config={'indicated_air_speed': (5, 'quantile')})),
    ('pca_speed_power', PCATransformer(columns_to_pca=['net_power', 'compressor_speed'], n_components=1)),
    ('torque_temp_feature', TorqueTempFeature()),
    ('drop_columns', ColumnDropper(columns_to_drop=['indicated_air_speed', 'power_avail'])), # Add ColumnDropper here
    ('scale_data', ColumnStandardizer(columns_to_standardize=[
        'mean_gas_temp',
        'pca_net_power_compressor_speed',
        'torque_times_temp'])) # Assuming you still want scaling at the end
])



In [28]:
preprocessing

In [33]:
preprocessing.fit_transform(data_train_x)

Applying custom root 2.35 transformation to column 'power_avail'...
Applying binning transformation to column 'indicated_air_speed' with 5 bins and strategy 'quantile'...
Fitting PCA on columns: ['net_power', 'compressor_speed']
Transforming columns ['net_power', 'compressor_speed'] using PCA...
Dropping original PCA columns: ['net_power', 'compressor_speed']
Creating 'torque_times_temp' feature...
Dropping 'torque_meas' and 'outside_air_temp' columns...


Unnamed: 0,mean_gas_temp,power_avail_root_2.35_transformed,indicated_air_speed_binned_5_quantile,pca_net_power_compressor_speed,torque_times_temp
331067,0.698165,18.952734,1,0.740530,0.644852
601458,0.527553,18.567861,2,0.804122,0.623529
77053,0.310035,3.892758,1,0.909594,0.463536
664037,0.253026,12.252963,1,0.187028,0.414863
346977,0.659898,19.113119,2,0.750318,0.613678
...,...,...,...,...,...
259178,0.424834,7.938947,1,0.926165,0.634740
365838,0.474034,6.603664,2,0.279659,0.399555
131932,0.746193,12.173347,3,0.461816,0.400550
671155,0.491678,26.522299,2,0.847034,0.394653


In [37]:
preprocessing.get_feature_names_out()

AttributeError: Estimator rename_features does not provide get_feature_names_out. Did you mean to call pipeline[:-1].get_feature_names_out()?

# Select and Train a Model

## Training and Evaluating on the Training Set

In [36]:
from sklearn.linear_model import LogisticRegression

log_reg = make_pipeline(preprocessing, LogisticRegression())
log_reg.fit(data_train_x, data_train_y)

Applying custom root 2.35 transformation to column 'power_avail'...
Applying binning transformation to column 'indicated_air_speed' with 5 bins and strategy 'quantile'...
Fitting PCA on columns: ['net_power', 'compressor_speed']
Transforming columns ['net_power', 'compressor_speed'] using PCA...
Dropping original PCA columns: ['net_power', 'compressor_speed']
Creating 'torque_times_temp' feature...
Dropping 'torque_meas' and 'outside_air_temp' columns...


Let's try the full preprocessing pipeline on a few training instances:

In [40]:
data_predictions = log_reg.predict(data_train_x)
data_predictions[:5].round(-2)  # -2 = rounded to the nearest hundred

Applying custom root 2.35 transformation to column 'power_avail'...
Applying binning transformation to column 'indicated_air_speed' with 5 bins and strategy 'quantile'...
Transforming columns ['net_power', 'compressor_speed'] using PCA...
Dropping original PCA columns: ['net_power', 'compressor_speed']
Creating 'torque_times_temp' feature...
Dropping 'torque_meas' and 'outside_air_temp' columns...




array([0, 0, 0, 0, 0])

Compare against the actual values:

In [41]:
data_train_y.iloc[:5].values

array([1, 0, 0, 0, 1])

In [42]:
# prompt: calcola l'accurcay, precision, sensitivity e specificity della regressione logistica dopo aver elaborato i dati tramite pipeline

from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score

# Assuming log_reg is your fitted pipeline and data_test is your test DataFrame
# Separate features (X_test) and target (y_test) from the test set
data_test_y = data_test['y_target']
data_test_x = data_test.drop('y_target', axis=1)

# Make predictions on the test set
y_pred = log_reg.predict(data_test_x)

# Calculate the confusion matrix
cm = confusion_matrix(data_test_y, y_pred)
tn, fp, fn, tp = cm.ravel()

# Calculate metrics
accuracy = accuracy_score(data_test_y, y_pred)
precision = precision_score(data_test_y, y_pred)
sensitivity = recall_score(data_test_y, y_pred) # Sensitivity is also known as Recall
# Specificity: TN / (TN + FP)
specificity = tn / (tn + fp) if (tn + fp) > 0 else 0 # Handle potential division by zero

print("Confusion Matrix:")
print(cm)
print(f"\nAccuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Sensitivity (Recall): {sensitivity:.4f}")
print(f"Specificity: {specificity:.4f}")


Applying custom root 2.35 transformation to column 'power_avail'...
Applying binning transformation to column 'indicated_air_speed' with 5 bins and strategy 'quantile'...
Transforming columns ['net_power', 'compressor_speed'] using PCA...
Dropping original PCA columns: ['net_power', 'compressor_speed']
Creating 'torque_times_temp' feature...
Dropping 'torque_meas' and 'outside_air_temp' columns...




Confusion Matrix:
[[76097 12550]
 [16471 43407]]

Accuracy: 0.8046
Precision: 0.7757
Sensitivity (Recall): 0.7249
Specificity: 0.8584


In [43]:
# prompt: utilizza l'algoritmo di regressione logistica sul data_test per stimarre 'y_target' , calcola l'accurcay, precision, sensitivity e specificity

# Make predictions on the test set
y_pred = log_reg.predict(data_test_x)

# Calculate the confusion matrix
cm = confusion_matrix(data_test_y, y_pred)
tn, fp, fn, tp = cm.ravel()

# Calculate metrics
accuracy = accuracy_score(data_test_y, y_pred)
precision = precision_score(data_test_y, y_pred)
sensitivity = recall_score(data_test_y, y_pred) # Sensitivity is also known as Recall
# Specificity: TN / (TN + FP)
specificity = tn / (tn + fp) if (tn + fp) > 0 else 0 # Handle potential division by zero

print("Confusion Matrix:")
print(cm)
print(f"\nAccuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Sensitivity (Recall): {sensitivity:.4f}")
print(f"Specificity: {specificity:.4f}")


Applying custom root 2.35 transformation to column 'power_avail'...
Applying binning transformation to column 'indicated_air_speed' with 5 bins and strategy 'quantile'...
Transforming columns ['net_power', 'compressor_speed'] using PCA...
Dropping original PCA columns: ['net_power', 'compressor_speed']
Creating 'torque_times_temp' feature...
Dropping 'torque_meas' and 'outside_air_temp' columns...
Confusion Matrix:
[[76097 12550]
 [16471 43407]]

Accuracy: 0.8046
Precision: 0.7757
Sensitivity (Recall): 0.7249
Specificity: 0.8584


