In [1]:
# standard
import pandas as pd
import numpy as np
import os
import sklearn

# pipelining
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.compose import ColumnTransformer

# constants
data_fname = "../data/NormativeODI_DATA_2024-01-04_1611.csv"
os.getcwd()

'c:\\Users\\james\\vm-spinal-risk\\vm-spinal-risk\\notebooks'

In [2]:
data = pd.read_csv(os.path.join(os.getcwd(), data_fname))
data.shape

(218, 29)

In [17]:
def filter_df_by_attention_check(data:pd.DataFrame, col_start: int, col_end: int, tol:int) -> pd.DataFrame:
    """
    Filter a pandas data frame of survey responses for rows where the attention check is passed.

    args:
        data: pd.DataFrame
            Survey responses. Columns may vary.
        col_start: int
            The column index where the attention check question starts.
        col_end: int 
            The column index where the attention check question ends.
        tol: int
            The exact number of choices that must be selected to pass the attention check

    returns: pd.DataFrame - only survey responses that passed the attention check
    """

    subset = data.iloc[:, col_start:col_end]
    result = (subset == 1).sum(axis=1) == tol
    print(f"{result.mean()*100:.2f} percent of responses passed the attention check.")
    return subset[result].filter(regex="/(attention_check)", axis = 1)

In [10]:
def verify_age(data:pd.DataFrame, col_idx: int) -> pd.DataFrame:
    """
    Filter a pandas data frame of survey responses for rows where the age is reasonable.

    args:
        data: pd.DataFrame
            Survey responses. Columns may vary.
        col_idx: int
            The column index that contain responses for the respondent's age

    returns: pd.DataFrame - only survey responses where the respondent age is reasonable     
    """
    subset = data.iloc[:, col_idx]
    result = (subset.iloc[:, 3] <= 122) | (subset.iloc[:, 3] >= 0)
    print(f"{result.mean()*100:.2f} percent of responses passed the attention check.")
    return result


In [18]:
filter_df_by_attention_check(data, -7, -1, 4).head()

83.94 percent of responses passed the attention check.


Unnamed: 0,attention_check___1,attention_check___2,attention_check___3,attention_check___4,attention_check___5,attention_check___6
1,1,1,0,1,1,0
3,1,0,0,1,1,1
4,1,1,1,1,0,0
5,1,1,1,1,0,0
6,1,1,1,1,0,0


In [None]:
class DropUnbalancedFeatures(BaseEstimator, TransformerMixin):
  """
  Custom scikit-learn transformer to remove features with unbalanced distribution.
  Author: Evan Yip

  Parameters
  ----------
  threshold : float, optional
      The threshold for feature unbalance. Features with a dominant class percentage
      exceeding this threshold will be dropped. Default is 0.9.
  verbose : bool, optional
      If True, print information about the features being removed during fit and
      transformation. Default is True.

  Attributes
  ----------
  threshold : float
      The specified threshold for feature unbalance.
  verbose : bool
      Flag indicating whether to print information during fit and transform.
  columns : list or None
      List of feature names if input data is a DataFrame, otherwise None.
  features_kept : list
      Indices of features kept after applying the unbalance threshold.
  features_dropped : list
      Indices of features dropped after applying the unbalance threshold.

  Methods
  -------
  fit(X, y=None)
      Fit the transformer to the input data, identifying unbalanced features.
  transform(X)
      Transform the input data by keeping only the balanced features.
  get_feature_names_out(input_features=None)
      Return the names of the output features.

  Notes
  -----
  - The transformer identifies features with a dominant class percentage exceeding
    the specified threshold and drops them.
  - The fit method prints information about the removed features if verbose is True.

  Examples
  --------
  >>> transformer = DropUnbalancedFeatures(threshold=0.85, verbose=True)
  >>> X_balanced = transformer.fit_transform(X_unbalanced)
  """
  def __init__(self, threshold=0.9, verbose=True):
    self.threshold = threshold
    self.verbose = verbose
    self.columns = None
    self.features_kept = []
    self.features_dropped = []

  def fit(self, X, y=None):
    if self.verbose:
      print(f"Removing unbalanced features with (threshold={self.threshold})")
    X = X.copy()
    if isinstance(X, pd.DataFrame):
      self.columns = list(X.columns)
    else:
      self.columns = None
    self.features_dropped = []
    for col_idx in range(X.shape[1]):
      counts = np.unique(X.iloc[:, col_idx] if isinstance(X, pd.DataFrame) else X[:, col_idx], return_counts=True)[1]
      percent = counts / counts.sum()

      if (percent > self.threshold).any():
          self.features_dropped.append(col_idx)
      else:
          self.features_kept.append(col_idx)
    if self.verbose:
      print("Parsing complete")
    return self

  def transform(self, X):
    if self.verbose:
      if len(self.features_dropped) > 0:
        print(f"Removed: {self.features_dropped}")
      else:
        print("No features removed")
    return X.iloc[:, self.features_kept] if isinstance(X, pd.DataFrame) else X[:, self.features_kept]

  def get_feature_names_out(self, input_features=None):
    if self.columns is not None:
      output_features = [self.columns[i] for i in self.features_kept]
      return output_features
    else:
      output_features = [input_features[i] for i in self.features_kept]
      return output_features

In [None]:
data.columns

Index(['record_id', 'redcap_survey_identifier',
       'assessment_of_back_pain_in_people_who_never_had_sp_timestamp', 'age',
       'sex', 'height', 'weight_in_pounds', 'zip_code',
       'how_physically_demanding_i', 'have_you_ever_experienced',
       'how_have_you_addressed_add', 'please_specify', 'pain_intensity',
       'personal_care_e_g_washing', 'lifting', 'walking', 'sitting',
       'standing', 'sleeping', 'social_life', 'travelling',
       'employment_homemaking', 'attention_check___1', 'attention_check___2',
       'attention_check___3', 'attention_check___4', 'attention_check___5',
       'attention_check___6',
       'assessment_of_back_pain_in_people_who_never_had_sp_complete'],
      dtype='object')

In [None]:
ohe_cols = [""]

In [None]:
# define preprocessing pipeline
ohe_pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('onehot', OneHotEncoder(drop='first', sparse_output=False, handle_unknown='ignore')),
    ('selector', DropUnbalancedFeatures(threshold=0.8, verbose=False))
])

cat_pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='constant', fill_value=0)),
    ('selector', DropUnbalancedFeatures(threshold=0.8, verbose=False))
])

num_pipe = Pipeline([
    ('imputer', IterativeImputer(random_state=52)),
    ('scaler', StandardScaler())
])


preprocessor = ColumnTransformer(
    transformers=[
        ('ohe', ohe_pipe, ohe_cols),
        ('cat', cat_pipe, cat_cols),
        ('num', num_pipe, num_cols)
    ])

NameError: name 'ohe_cols' is not defined