# Custom Estimator

In [1]:
from sklearn.base import BaseEstimator, ClassifierMixin
import numpy as np
from sklearn.utils import check_X_y

class MostFrequentClassifier(BaseEstimator, ClassifierMixin):
    def __init__(self):
        self.most_frequent_ = None

    def fit(self, X, y):
        # Validate the inputs
        X, y = check_X_y(X, y)
        y = np.ravel(y)
        
        # Find the most frequent class in y
        unique_classes, counts = np.unique(y, return_counts=True)
        self.most_frequent_ = unique_classes[np.argmax(counts)]  # Select most frequent class
        
        return self

    def predict(self, X):
        if self.most_frequent_ is None:
            raise ValueError("This classifier instance is not fitted yet.")
        
        # Predict the most frequent class for all instances
        return np.full(shape=(X.shape[0],), fill_value=self.most_frequent_)

# Test the classifier
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris

iris = load_iris()
X, y = iris.data, iris.target

X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
classifier = MostFrequentClassifier()
classifier.fit(X_train, y_train)
predictions = classifier.predict(X_test)

print(f"Predicted class for all test instances: {predictions[0]}")


Predicted class for all test instances: 1


In [2]:
import pandas as pd
data = pd.read_csv(r"C:\Users\mk744\OneDrive - Poornima University\Desktop\cropdata_updated.csv")
data.head(2)

Unnamed: 0,crop ID,soil_type,Seedling Stage,MOI,temp,humidity,result
0,Wheat,Black Soil,Germination,1,25,80.0,1
1,Wheat,Black Soil,Germination,2,26,77.0,1


In [3]:
data['result'].value_counts()

0    9062
1    6227
2    1122
Name: result, dtype: int64

In [4]:
class MostFrequentCassifier(BaseEstimator, ClassifierMixin):
     def __init__(self):
          self.most_frequent_ = None
          
     def fit(self,X, y):
          X , y = check_X_y(X, y)
          y = np.ravel(y)
          
          unique_calsses, counts = np.unique(y, return_counts = True)
          self.most_frequent_ = unique_calsses[np.argmax(counts)]
          return self
     
     def predict(self, X):
          if self.most_frequent_ is None:
               raise ValueError("This instance is not predict yet.")
          return np.full(shape = (X.shape[0],), fill_value = self.most_frequent_
                         )
          
          
import pandas as pd
df = pd.read_csv(r"C:\Users\mk744\OneDrive - Poornima University\Desktop\cropdata_updated.csv")
X , y = df['humidity'].values.reshape(-1, 1), df['result']
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
Classifier = MostFrequentCassifier()
Classifier.fit(X_train, y_train)
Predictions = Classifier.predict(X_test)

print(f"Predicted class for all test instances: {Predictions[0]}")
     

Predicted class for all test instances: 0


In [5]:
Classifier.most_frequent_

0

In [6]:
from sklearn.model_selection import cross_val_score
cross_val_score(Classifier, X_train, y_train)

array([0.55077173, 0.55077173, 0.55077173, 0.55099553, 0.55099553])

# Scoing Function

In [7]:
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.metrics import accuracy_score
import numpy as np

In [8]:
class MostFrequentCassifier(BaseEstimator, ClassifierMixin):
     def __init__(self):
          self.most_frequent_ = None
          
     def fit(self, X, y):
          y = np.ravel(y)
          unique_classes, counts = np.unique(y, return_counts=True)
          self.most_frequent_ = unique_classes[np.argmax(counts)]
          return self
     
     def predict(self, X):
          if self.most_frequent_ is None:
               raise ValueError("This classifier instance is not fitted yet.")
          return np.full(shape = (X.shape[0],), fill_value= self.most_frequent_)
     
     def score(self, X, y):
          y = np.ravel(y)
          predictions = self.predict(X)
          return accuracy_score(y, predictions)
     
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
iris = load_iris()
X, y = iris.data, iris.target

is_class_0_or_1 = y < 2
X_bin = X[is_class_0_or_1]
y_bin = y[is_class_0_or_1]

X_train, X_test, y_train, y_test = train_test_split(X_bin, y_bin, test_size=0.2, random_state=42)
classifier = MostFrequentCassifier()
classifier.fit(X_train, y_train)

score = classifier.score(X_test, y_test)
print(f"Accuracy of the MostfrequentClassifier: {score}")     

Accuracy of the MostfrequentClassifier: 0.4


In [9]:
from sklearn.datasets import make_regression
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler

X, y = make_regression(n_samples=100, n_features= 2, noise = 0.1, random_state=42)
X_transformed = StandardScaler().fit_transform(X)
LinearRegression().fit(X_transformed, y)

# Custom Transformer using BaseEstimator and TransformerMixin

In [10]:
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
class MedianIQRScalar(BaseEstimator, TransformerMixin):
     def __init__(self):
          self.medians_ = None
          self.iqr_ = None
          
     def fit(self, X, y = None):
          self.medians_ = np.median(X, axis = 0)
          Q1 = np.percentile(X, 25, axis = 0)
          Q3 = np.percentile(X, 75, axis = 0)
          self.iqr_ =   Q3 - Q1
          
          self.iqr_[self.iqr_ == 0] = 1
          return self
     
     def transform(self, X):
          if self.medians_ is None or self.iqr_ is None:
               raise RuntimeError("This transformer has not been fitted yet.")
          return (X - self.medians_) / self.iqr_
     
     
from sklearn.datasets import make_blobs
X, _ = make_blobs(n_samples= 100, n_features=2, centers=3, random_state=42)
scaler = MedianIQRScalar()
scaler.fit(X)
X_scaled = scaler.transform(X)
print("Transformed data (first 5 rows) :")
print(X_scaled[:5])
     

Transformed data (first 5 rows) :
[[-0.49872679 -0.71613207]
 [ 0.78423675 -0.08192868]
 [-0.03656645  0.52987512]
 [ 0.84159877 -0.09379661]
 [-0.3814692  -0.57206564]]


In [11]:
data = pd.read_excel(r'C:\personal\Sklearn_And_Python_For_Interview\Flipkart_data.xlsx')
data.columns

Index(['Campaign_Name_Partner_Hyyzo', 'Amount_Partner_Hyyzo',
       'Payout_Partner_Hyyzo', 'Date_Partner_Hyyzo', 'CK_Payout', 'HY_Payout'],
      dtype='object')

In [12]:
data.head()

Unnamed: 0,Campaign_Name_Partner_Hyyzo,Amount_Partner_Hyyzo,Payout_Partner_Hyyzo,Date_Partner_Hyyzo,CK_Payout,HY_Payout
0,Flipkart [CPS] IN,257,4.52,2024-10-19,4.017778,4.268889
1,Flipkart [CPS] IN,279,7.19,2024-10-19,6.391111,6.790556
2,Flipkart [CPS] IN,71,1.8,2024-10-19,1.6,1.7
3,Flipkart [CPS] IN,369,9.92,2024-10-19,8.817778,9.368889
4,Flipkart [CPS] IN,579,10.82,2024-10-19,9.617778,10.218889


In [13]:
class MostFrequentCassifier(BaseEstimator, ClassifierMixin):
     def __int__(self):
          self.most_frequent_ = None
          
     def fit(self, X, y):
          X, y = check_X_y(X, y)
          y = np.ravel(y)
          
          unique_classes, counts = np.unique(y, return_counts=True)
          self.most_frequent_ = unique_classes[np.argmax(counts)]
          return self
     
     def predict(self, X):
          if self.most_frequent_ is None:
               raise ValueError("This instance is not predicting anything: ")
          
          return np.full(shape=(X.shape[0],), fill_value=self.most_frequent_)
     
     
     
data = pd.read_excel(r'C:\personal\Sklearn_And_Python_For_Interview\Flipkart_data.xlsx')
X, y = data['Amount_Partner_Hyyzo'].values.reshape(-1, 1), data['Payout_Partner_Hyyzo']
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
sol = MostFrequentCassifier()
sol.fit(X_train, y_train)
predicter = sol.predict(X_test)
print(f"This instance consist of : {predicter[0]}")


This instance consist of : 0.9


# Custom Transformer using Function Transformer

In [14]:
def cube(x):
     return np.power(x,3)

In [15]:
from sklearn.preprocessing import FunctionTransformer
cube_transformer = FunctionTransformer(cube)

In [16]:
from sklearn.datasets import make_regression
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
X, y = make_regression(n_samples=100, n_features=2, noise=0.1, random_state=42)
X_transformed = cube_transformer.transform(X)
LinearRegression().fit(X_transformed, y)

# Custom Transformer using BaseEstimator and TransformerMixin

In [17]:
class Median_IQR_Scaler(BaseEstimator, TransformerMixin):
     def __init__(self):
          self.median_ = None
          self.iqr_ = None
          
     def fit(self, X, y = None):
          self.median_ = np.median(X, axis=0)
          Q1 = np.percentile(X, 25, axis = 0)
          Q3 = np.percentile(X, 75, axis = 0)
          self.iqr_ = Q3 - Q1
          self.iqr_[self.iqr_ == 0] = 1
          return self
     
     def transform(self, X):
          if self.median_ is None or self.iqr_ is None:
               raise RuntimeError("The transformer has not been fitted yet.")
          return (X - self.median_) / self.iqr_

In [18]:
X, _ = make_blobs(n_samples= 100, n_features=2, centers = 3, random_state=42)

scaler = Median_IQR_Scaler()
scaler.fit(X)
X_scaled = scaler.transform(X)
print("Transformed data (first 5 rows) :")
print(X_scaled[:5])

Transformed data (first 5 rows) :
[[-0.49872679 -0.71613207]
 [ 0.78423675 -0.08192868]
 [-0.03656645  0.52987512]
 [ 0.84159877 -0.09379661]
 [-0.3814692  -0.57206564]]


# Feature Union

In [19]:
np.random.seed(42)
data = np.random.rand(10, 4)
df = pd.DataFrame(data, columns=['f1', 'f2', 'f3', 'y'])
df

Unnamed: 0,f1,f2,f3,y
0,0.37454,0.950714,0.731994,0.598658
1,0.156019,0.155995,0.058084,0.866176
2,0.601115,0.708073,0.020584,0.96991
3,0.832443,0.212339,0.181825,0.183405
4,0.304242,0.524756,0.431945,0.291229
5,0.611853,0.139494,0.292145,0.366362
6,0.45607,0.785176,0.199674,0.514234
7,0.592415,0.04645,0.607545,0.170524
8,0.065052,0.948886,0.965632,0.808397
9,0.304614,0.097672,0.684233,0.440152


In [20]:
from sklearn.pipeline import FeatureUnion
from sklearn.decomposition import PCA
feature_union = FeatureUnion([
     ('scaler', StandardScaler()),
     ('pca', PCA(n_components=2))
])

In [21]:
X_transformed = feature_union.fit_transform(df.drop(columns=['y']))
pd.DataFrame(X_transformed, columns=feature_union.get_feature_names_out())

Unnamed: 0,scaler__f1,scaler__f2,scaler__f3,pca__pca0,pca__pca1
0,-0.24873,1.419472,1.041692,0.566558,0.065238
1,-1.231671,-0.865211,-1.189537,-0.326067,0.00784
2,0.770437,0.721919,-1.313692,-0.080872,0.492691
3,1.810983,-0.70323,-0.779846,-0.456381,0.118928
4,-0.56494,0.194916,0.048269,0.104292,0.000952
5,0.818738,-0.912648,-0.414592,-0.374897,-0.062619
6,0.118004,0.943578,-0.720751,0.125587,0.373463
7,0.731301,-1.180132,0.629657,-0.27032,-0.358406
8,-1.640854,1.414215,1.815237,0.800051,-0.184421
9,-0.563268,-1.032878,0.883562,-0.087953,-0.453664
