In [5]:
import pandas as  pd
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin

In [6]:
df = pd.DataFrame({'a': np.random.randint(10,100, size=10),
                   'b': np.random.randint(10,100, size=10),
                   'c': np.random.randint(10,100, size=10)})
df

Unnamed: 0,a,b,c
0,91,23,40
1,99,18,88
2,79,78,54
3,22,85,99
4,31,93,55
5,13,95,25
6,16,72,37
7,59,22,63
8,48,19,29
9,15,20,44


In [7]:
class StandardScaler(BaseEstimator,TransformerMixin):
    def __init__(self,variables: list|None = None):
        self.variables = variables
        self.errors_ =None
        
    def fit(self,X: pd.DataFrame, y = None):
        if self.variables is not None:
            self.numeric_columns_ = pd.Index(self.variables)
        else:
            numeric_columns = X.select_dtypes(include=[np.number]).columns
            self.numeric_columns_ = numeric_columns 
        self.mean_ = X[self.numeric_columns_].mean()
        self.std_ = X[self.numeric_columns_].std()
        return self
    
    def transform(self,X: pd.DataFrame,y=None):
        data = X.copy()
        try:
            data[self.numeric_columns_] = (data[self.numeric_columns_] - self.mean_) / self.std_
        except Exception as e:
            self.errors_ = e
        return data

In [8]:
scaler = StandardScaler(['a','b'])
scaler.fit(df)

In [9]:
scaler.transform(df)

Unnamed: 0,a,b,c
0,1.322281,-0.855462,40
1,1.564346,-1.000455,88
2,0.959183,0.739467,54
3,-0.765531,0.942458,99
4,-0.493208,1.174448,55
5,-1.037854,1.232445,25
6,-0.94708,0.565475,37
7,0.35402,-0.884461,63
8,0.021181,-0.971457,29
9,-0.977338,-0.942458,44


In [10]:
scaler.mean_
pd.concat([scaler.mean_,scaler.std_],axis=1,keys=['mean','std'])

Unnamed: 0,mean,std
a,47.3,33.048954
b,52.5,34.484296


In [11]:
class MinMaxScaler (BaseEstimator, TransformerMixin):
    def __init__(self, feature_range=(0,1), variables: list|None = None):
        self.feature_range = feature_range
        self.variables = variables 
        self.errors_=None
    
    def fit (self,X : pd.DataFrame, y=None ):
        if self.variables is not None:
            self.numeric_columns_ = pd.Index(self.variables)
        else:
            numeric_columns = X.select_dtypes(include=[np.number]).columns
            self.numeric_columns_ = numeric_columns 
        self.min_ = X[self.numeric_columns_].min()
        self.max_ = X[self.numeric_columns_].max()
        return self
    
    def transform(self,X : pd.DataFrame, y=None):
        data = X.copy()
        try:
            min_val, max_val = self.feature_range
            data[self.numeric_columns_] = (data[self.numeric_columns_] - self.min_) / (self.max_ - self.min_) * (max_val - min_val) + min_val
        except Exception as e:
            self.errors_ = e
        return data

In [12]:
minimax = MinMaxScaler()
minimax.fit(df)
minimax.transform(df)

Unnamed: 0,a,b,c
0,0.906977,0.064935,0.202703
1,1.0,0.0,0.851351
2,0.767442,0.779221,0.391892
3,0.104651,0.87013,1.0
4,0.209302,0.974026,0.405405
5,0.0,1.0,0.0
6,0.034884,0.701299,0.162162
7,0.534884,0.051948,0.513514
8,0.406977,0.012987,0.054054
9,0.023256,0.025974,0.256757


In [13]:
class winsorizer(BaseEstimator, TransformerMixin):
    def __init__(self,variables: list|None = None,lower_quantile=0.25, upper_quantile=0.75, K=5):
        self.lower_quantile = lower_quantile
        self.upper_quantile = upper_quantile
        self.variables = variables
        self.K = K
        self.errors_ =None

    def fit(self, X: pd.DataFrame , y=None):
        if self.variables is not None:
            self.numeric_columns_ = pd.Index(self.variables)
        else:
            numeric_columns = X.select_dtypes(include=[np.number]).columns
            self.numeric_columns_ = numeric_columns 
            
        self.q1=X[self.numeric_columns_].quantile(self.lower_quantile)
        self.q3=X[self.numeric_columns_].quantile(self.upper_quantile)
        self.iqr=self.q3-self.q1
            
        #calculate thresholds
        self.lower_threshold=self.q1-self.K*self.iqr
        self.upper_threshold=self.q3+self.K*self.iqr
        return self

    def transform(self, X : pd.DataFrame, y=None):
        data = X.copy()
        try:
            data[self.numeric_columns_]=data[self.numeric_columns_].clip(lower=self.lower_threshold,upper=self.upper_threshold)
        except Exception as e:
            self.errors_ = e
        return data

In [14]:
np.random.seed(0)
df1 = pd.DataFrame({
    'A': np.random.normal(loc=100, scale=20, size=1000),
    'B': np.random.normal(loc=50, scale=10, size=1000),
    'C': np.random.normal(loc=200, scale=30, size=1000),
    'D': np.random.normal(loc=300, scale=40, size=1000)
})

In [15]:
win = winsorizer(lower_quantile=0.05, upper_quantile=0.95, K=2)
win.fit(df1)

In [16]:
transformed = win.transform(df1)

In [17]:
class MeanMedianImputer(BaseEstimator, TransformerMixin):
    def __init__(self, variables: list|None = None,imputation_type='mean'):
        self.imputation_type = imputation_type
        self.variables = variables
        self.errors_ =None
        
    def fit(self, X : pd.DataFrame, y=None):
        if self.variables is not None:
            self.numeric_columns_ = pd.Index(self.variables)
        else:
            numeric_columns = X.select_dtypes(include=[np.number]).columns
            self.numeric_columns_ = numeric_columns
        self.mean_ = X[self.numeric_columns_].mean()
        self.median_ = X[self.numeric_columns_].median()
        return self
    
    def transform(self, X : pd.DataFrame, y=None):
        data = X.copy()
        try:
            if self.imputation_type == 'mean':
                data[self.numeric_columns_] = data[self.numeric_columns_].fillna(self.mean_)
            else :
                data[self.numeric_columns_] = data[self.numeric_columns_].fillna(self.median_)
        except Exception as e:
            self.errors_ = e
        return data

In [18]:
df2 = pd.DataFrame({
    'A': [1, 2, np.nan, 4, 5],
    'B': [5, np.nan, 7, 8, 9],
    'C': [np.nan, 12, 13, 14, 15],
    'D': [16, 17, 18, np.nan, 20]
})

In [19]:
df2

Unnamed: 0,A,B,C,D
0,1.0,5.0,,16.0
1,2.0,,12.0,17.0
2,,7.0,13.0,18.0
3,4.0,8.0,14.0,
4,5.0,9.0,15.0,20.0


In [20]:
MMI = MeanMedianImputer(['A','C'],imputation_type= 'median')

In [21]:
MMI.fit(df2)

In [22]:
t = MMI.transform(df2)

In [23]:
t

Unnamed: 0,A,B,C,D
0,1.0,5.0,13.5,16.0
1,2.0,,12.0,17.0
2,3.0,7.0,13.0,18.0
3,4.0,8.0,14.0,
4,5.0,9.0,15.0,20.0


In [24]:
class categoricalImputer(BaseEstimator,TransformerMixin):
    def __init__(self,variables : list|None=None, strategy='most_frequent'):
        self.variables=variables
        self.strategy=strategy
        self.errors_ =None
        
    def fit(self,X: pd.DataFrame,y=None):
        if self.variables is not None:
            self.categorical_columns_ = pd.Index(self.variables)
        else:
            categorical_columns = X.select_dtypes(include=['object']).columns
            self.categorical_columns_ = categorical_columns
        self.fill_values_ = X[self.categorical_columns_].mode().iloc[0]
        return self
    def transform(self,X : pd.DataFrame, y = None ):
        data = X.copy()
        try:
            data[self.categorical_columns_] = data[self.categorical_columns_].fillna(self.fill_values_)
        except Exception as e:
            self.errors_ = e
        return data

In [25]:
df3 = pd.DataFrame({
    'A': ['apple', 'banana', 'apple', np.nan, 'banana'],
    'B': ['red', 'green', np.nan, 'red', 'green'],
    'C': ['small', 'large', 'medium', 'medium', np.nan]
})

In [26]:
CI = categoricalImputer()
CI.fit(df3)

In [27]:
CIT = CI.transform(df3)

In [28]:
CIT

Unnamed: 0,A,B,C
0,apple,red,small
1,banana,green,large
2,apple,green,medium
3,apple,red,medium
4,banana,green,medium


In [29]:
class Count_frequency_encoder(BaseEstimator,TransformerMixin):
    def __init__(self, variables: list | None = None):
        self.variables = variables
        self.errors_ =None

    def fit(self, X, y=None):
        if self.variables is not None:
            self.categorical_variables = self.variables
        else:
            categorical_columns_ = X.select_dtypes(include=['object']).columns
            self.categorical_variables = categorical_columns_
        
        self.encoding_dict_ = {}
        for var in self.categorical_variables:
            value_counts = X[var].value_counts()
            total_count = value_counts.sum()
            self.encoding_dict_[var] = value_counts / total_count 
        return self

    def transform(self, X: pd.DataFrame , y = None):
        data = X.copy()
        try:
            for var in self.categorical_variables:
                data[var] = data[var].map(self.encoding_dict_[var])
        except Exception as e:
            self.errors_ = e
        return data

In [30]:
CFE = Count_frequency_encoder()
CFE.fit(df3)

In [31]:
CFET = CFE.transform(CIT)
CFET

Unnamed: 0,A,B,C
0,0.5,0.5,0.25
1,0.5,0.5,0.25
2,0.5,0.5,0.5
3,0.5,0.5,0.5
4,0.5,0.5,0.5


In [32]:

class OneHotEncoder(BaseEstimator, TransformerMixin):
    def __init__(self, variables: list |None = None):
        self.variables = variables
        self.categorical_variables = {}
        self.errors_= None
    
    def fit(self, X: pd.DataFrame, y=None):
        if self.variables is not None:
            self.categorical_columns_ = self.variables
        else:
            self.categorical_columns_ = X.select_dtypes(include=['object']).columns
        
        for col in self.categorical_columns_:
            self.categorical_variables[col] = X[col].unique()
        
        print("OneHotEncoder fitted successfully!")
        return self
    
    def transform(self, X: pd.DataFrame, y=None):
        data = X.copy()
        
        try:
            for col in self.categorical_variables:
                categories = self.categorical_variables[col]
                for category in categories:
                    new_col_name = f'{col}_{category}'
                    data[new_col_name] = (data[col] == category).astype(int)
                
                # Drop the original categorical column
                data.drop(col, axis=1, inplace=True)
        except Exception as e:
            self.errors_ = e    
        return data


In [33]:
df4 = pd.DataFrame({
    'A': ['apple', 'banana', 'apple','apple', 'banana'],
    'B': ['red', 'green', 'green', 'red', 'green'],
    'C': ['small', 'large', 'medium', 'medium', 'large']
})

In [34]:
ohe = OneHotEncoder()
ohe.fit(df4)

OneHotEncoder fitted successfully!


In [35]:
df4_ = ohe.transform(df4)
df4_

Unnamed: 0,A_apple,A_banana,B_red,B_green,C_small,C_large,C_medium
0,1,0,1,0,1,0,0
1,0,1,0,1,0,1,0
2,1,0,0,1,0,0,1
3,1,0,1,0,0,0,1
4,0,1,0,1,0,1,0


In [36]:
class OrdinalEncoder(BaseEstimator, TransformerMixin):
    def __init__(self, variables: list | None= None, mapping: dict = None):
        self.variables = variables
        self.mapping = mapping
        self.ordinal_mapping = {}
        self.errors_ =None
    
    def fit(self, X: pd.DataFrame, y=None):
        if self.variables is not None:
            self.categorical_columns_ = self.variables
        else:
            self.categorical_columns_ = X.select_dtypes(include=['object']).columns
        
        if self.mapping is None:
            self.generate_mapping(X)
        else:
            self.ordinal_mapping = self.mapping
        
        return self
    
    def generate_mapping(self, X : pd.DataFrame , y = None):
        for col in self.categorical_columns_:
            unique_values = X[col].unique()
            self.ordinal_mapping[col] = {value: i for i, value in enumerate(unique_values)}
    
    def transform(self, X: pd.DataFrame, y=None):
        data = X.copy()
        
        try:
            for col, mapping in self.ordinal_mapping.items():
                data[col] = data[col].map(mapping)
        except Exception as e:
            self.errors_ = e  
        return data


In [37]:
OrdinalEncoder = OrdinalEncoder()
df4_ordinal = OrdinalEncoder.fit(df4)

In [38]:
df4_ordinal

In [39]:
class DropConstantFeatures(BaseEstimator, TransformerMixin):
    def __init__(self, threshold=0.9):
        self.threshold = threshold
        self.errors_ =None
    
    def fit(self, X, y=None):
        
        self.columns_to_drop_ = X.columns[X.apply(lambda col: col.value_counts(normalize=True).values[0] >= self.threshold)].tolist()
        return self
    
    def transform(self, X : pd.DataFrame, y=None):
        data = X.copy()
        try:
            data = data.drop(columns=self.columns_to_drop_)
        
        except Exception as e:
            self.errors_ = e
        return data

In [40]:
df5 = pd.DataFrame({
    'A': [1, 2, 3, 4],
    'B': [1, 1, 1, 1],
    'C': [2, 3, 2, 3],
    'D': ['a', 'a', 'a', 'a'],
    'E': ['x', 'x', 'x', 'y']
})

In [41]:
DropConstantFeatures = DropConstantFeatures()

In [42]:
df5_dropped = DropConstantFeatures.fit(df5)
df5_dropped

In [43]:
class HighCardinalityImputer(BaseEstimator, TransformerMixin):
    def __init__(self, variables : list | None = None, threshold=0.3, fill_value='Other'):
        self.threshold = threshold
        self.variables = variables
        self.fill_value = fill_value
        self.categories_to_replace = {}
        self.errors_ =None
    
    def fit(self, X, y=None):
        if self.variables is not None:
              for col in self.variables:
                # Calculate the frequency of each category
                freq = X[col].value_counts(normalize=True)
                
                cumulative_freq = freq.cumsum()
                infrequent_categories = cumulative_freq[cumulative_freq < self.threshold].index.tolist()
                
                # Store these categories for replacement
                self.categories_to_replace[col] = infrequent_categories
        
        else:
            for col in X.columns:
           
                freq = X[col].value_counts(normalize=True)
            
            cumulative_freq = freq.cumsum()
            infrequent_categories = cumulative_freq[cumulative_freq < self.threshold].index.tolist()
            
            self.categories_to_replace[col] = infrequent_categories
        
        return self
    
    def transform(self, X : pd.DataFrame, y=None):
        data = X.copy()
        try:
            for col, categories in self.categories_to_replace.items():
                data[col] = data[col].apply(lambda x: self.fill_value if x in categories else x)
        except Exception as e:
            self.errors_ = e
        return data

In [44]:
df6 = pd.DataFrame({
    'Color': ['Red', 'Blue', 'Green', 'Yellow', 'Purple', 'Blue', 'Green'],
    'Shape': ['Circle', 'Square', 'Triangle', 'Circle', 'Circle', 'Square', 'Square'],
    'Size': ['Small', 'Medium', 'Large', 'Small', 'Small', 'Medium', 'Medium']
})

In [45]:
hic = HighCardinalityImputer(threshold=0.7)

In [46]:
df6_ = hic.fit(df6)

In [47]:
class DropDuplicateFeatures(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.duplicate_columns = []
        self.errors_ =None
    
    def fit(self, X, y=None):
        self.duplicate_columns = self.find_duplicate_columns(X)
        return self
    
    def transform(self, X : pd.DataFrame, y=None):
        data = X.copy()
        try:
            data = data.drop(columns=self.duplicate_columns)
        except Exception as e:
            self.errors_ = e
        return data
    
    def find_duplicate_columns(self, X : pd.DataFrame):
        duplicate_columns = []
        for i in range(X.shape[1]):
            col1 = X.iloc[:, i]
            for j in range(i+1, X.shape[1]):
                col2 = X.iloc[:, j]
                if col1.equals(col2):
                    duplicate_columns.append(X.columns[j])
                    
        return duplicate_columns


In [48]:
df7_ = pd.DataFrame({
    'A': [1, 2, 3],
    'B': [1, 2, 3],
    'C': [4, 5, 6],
    'D': [4, 5, 6],
    'E': [7, 8, 9]
})

In [49]:
dropper = DropDuplicateFeatures()
df_fil = dropper.fit_transform(df7_)

In [50]:
df_fil

Unnamed: 0,A,C,E
0,1,4,7
1,2,5,8
2,3,6,9


In [51]:
class DropCorrelatedFeatures(BaseEstimator, TransformerMixin):
    def __init__(self, threshold=0.95):
        self.threshold = threshold
        self.correlated_features = []
        self.errors_ =None
    
    def fit(self, X: pd.DataFrame, y=None):

        corr_matrix = X.corr().abs()
        self.correlated_features = self.find_correlated_features(corr_matrix)
        
        return self
    
    def transform(self, X: pd.DataFrame, y=None):
        data = X.copy()
        try:
            data = data.drop(columns=self.correlated_features)
        except Exception as e:
            self.errors_ = e
        return data
    
    def find_correlated_features(self, corr_matrix):
        correlated_features = set()
        
        for i in range(len(corr_matrix.columns)):
            for j in range(i):
                if abs(corr_matrix.iloc[i, j]) > self.threshold:
                    colname = corr_matrix.columns[i]
                    correlated_features.add(colname)
        
        return list(correlated_features)


In [52]:
df8 = pd.DataFrame({
    'A': [1, 2, 3, 4],
    'B': [2, 4, 6, 8],
    'C': [1, 2, 3, 4],
    'D': [2, 4, 6, 8]
})

In [53]:
dropcf = DropCorrelatedFeatures()


In [54]:
dropcf.fit_transform(df8)

Unnamed: 0,A
0,1
1,2
2,3
3,4


In [55]:
class BoxCoxTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, lambda_range=(-3.0, 3.0)):
        self.lambda_range = lambda_range
        self.lambda_ = None
        self.log_likelihood_ = None
        self.errors_ = None
        
    def fit(self, X, y=None):
        try:
            best_lambda = None
            best_log_likelihood = float('-inf')

            for lambda_value in np.linspace(*self.lambda_range, num=100):
                transformed_data = self._boxcox_transform(X, lambda_value)
                log_likelihood = self._log_likelihood(transformed_data)

                if log_likelihood > best_log_likelihood:
                    best_log_likelihood = log_likelihood
                    best_lambda = lambda_value

            self.lambda_ = best_lambda
            self.log_likelihood_ = best_log_likelihood
        except Exception as e:
            self.errors_ = e
        return self
   
    def transform(self, X):
        try:
            return self._boxcox_transform(X, self.lambda_)
        except Exception as e:
            self.errors_ = e
            return None
   
    def _boxcox_transform(self, X, lambda_):
        try:
            if lambda_ == 0:
                return np.log(X)
            else:
                return (X**lambda_ - 1) / lambda_
        except Exception as e:
            self.errors_ = e
            return None
   
    def _log_likelihood(self, X):
        try:
            n = X.shape[0]
            if self.lambda_ is None or self.lambda_ == 0:
                return -n / 2 * (1 + np.log(2 * np.pi) + np.log(np.mean(X ** 2)))
            else:
                return n / 2 * (np.log(self.lambda_ / (2 * np.pi)) + (self.lambda_ - 1) * np.mean(np.log(X)))
        except Exception as e:
            self.errors_ = e
            return None

In [56]:
df9 = pd.DataFrame({
    'A': [1, 2, 3, None],  
    'B': [2, 4, 6, 8],
    'C': [1, 2, 3, 4],
    'D': [2, 4, 6, 8]
})

In [60]:
box= BoxCoxTransformer()

In [61]:
box.fit_transform(df9)

Unnamed: 0,A,B,C,D
0,-0.0,0.291667,-0.0,0.291667
1,0.291667,0.328125,0.291667,0.328125
2,0.320988,0.33179,0.320988,0.33179
3,,0.332682,0.328125,0.332682


In [70]:
class YeoJohnsonTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, lambda_range=(-1.0, 2.0)):
        self.lambda_range = lambda_range
        self.lambda_ = None
        self.log_likelihood_ = None
        self.errors_ = None
        
    def fit(self, X, y=None):
        try:
            best_lambda = None
            best_log_likelihood = float('-inf')

            for lambda_value in np.linspace(*self.lambda_range, num=100):
                transformed_data = self._yeo_johnson_transform(X, lambda_value)
                log_likelihood = self._log_likelihood(transformed_data)

                if log_likelihood > best_log_likelihood:
                    best_log_likelihood = log_likelihood
                    best_lambda = lambda_value

            self.lambda_ = best_lambda
            self.log_likelihood_ = best_log_likelihood
        except Exception as e:
            self.errors_ = e
        return self
   
    def transform(self, X):
        try:
            return self._yeo_johnson_transform(X, self.lambda_)
        except Exception as e:
            self.errors_ = e
            return None
   
    def _yeo_johnson_transform(self, X, lambda_):
        try:
            if lambda_ == 0:
                return np.log1p(X)
            else:
                if lambda_ < 0:  
                    offset = 0.5 
                    X += offset  
                    transformed = ((X)**lambda_ - 1) / lambda_
                    transformed -= transformed.min() + 1  
                    return transformed
                else:  
                    return (X**lambda_ - 1) / lambda_
        except Exception as e:
            self.errors_ = e
            return None
   
    def _log_likelihood(self, X):
        try:
            n = X.shape[0]
            if self.lambda_ is None or self.lambda_ == 0:
                return -n / 2 * (1 + np.log(2 * np.pi) + np.log(np.mean(X ** 2)))
            else:
                return n / 2 * (np.log(self.lambda_ / (2 * np.pi)) + (self.lambda_ - 1) * np.mean(np.log(X)))
        except Exception as e:
            self.errors_ = e
            return None

In [71]:
df10 = pd.DataFrame({
    'A': [1, -2, 3, None],  
    'B': [2, 4, 6, 8],
    'C': [1, 2, 3, -4],
    'D': [2, 4, 6, 8]
})


In [72]:
yeo = YeoJohnsonTransformer()

In [73]:
yeo.fit_transform(df10)

Unnamed: 0,A,B,C,D
0,-0.988889,-1.0,-0.978632,-1.0
1,-1.0,-0.994987,-0.975709,-0.994987
2,-0.983333,-0.990847,-0.973077,-0.990847
3,,-0.987368,-1.0,-0.987368
