In [1]:
from sklearn.linear_model import LinearRegression
from sklearn.base import clone
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score

In [2]:
class RANSAC():
    def __init__(self,
                 base_estimator=LinearRegression(),
                 min_sample=None,
                 error_threshold=None,
                 fit_ratio=0.6,
                 loss='absolute_loss',
                 model_score='r2_score',
                 good_enough_score=0.9,
                 max_iteration=100
                ):
        self.base_estimator = base_estimator
        self.min_sample = min_sample
        self.error_threshold = error_threshold
        self.fit_ratio = fit_ratio
        self.loss = loss
        self.model_score = model_score
        self.good_enough_score = good_enough_score
        self.max_iteration = max_iteration
        
        self.estimator_ = None
        self.inliers_ = None
    
    def fit(self, X, y):
        n, m = X.shape
        base_estimator = clone(self.base_estimator)
        min_sample = self.min_sample if self.min_sample else m+1
        fit_ratio = self.fit_ratio
        model_score = self.model_score
        good_enough_score = self.good_enough_score
        
        if self.error_threshold is None:
            error_threshold = np.median(np.abs(y - np.median(y)))
        else:
            error_threshold = self.error_threshold
            
        if self.loss == 'absolute_loss':
            loss_function = lambda y_trues, y_preds: np.abs(y_trues - y_preds)
        else:
            loss_function = lambda y_trues, y_preds: (y_trues - y_preds)**2
        
        
        max_iteration = self.max_iteration
        n_iterate = 0
        
        best_model, best_score, best_size = None, None, None
        best_X, best_y = None, None
        
        while n_iterate < max_iteration:
            
            X_subset, X_left, y_subset, y_left = train_test_split(X, y, test_size = min_sample)
            
            base_estimator.fit(X_subset, y_subset)
            
            y_left_preds = base_estimator.predict(X_left)
            errors = loss_function(y_left, y_left_preds)
            
            mask = errors < error_threshold
            
            set_size = np.count_nonzero(mask)
            
            if set_size >= fit_ratio*n:
                
                X_set = np.concatenate((X_subset, X_left[mask]))
                y_set = np.concatenate((y_subset, y_left[mask]))
                
                model = base_estimator.fit(X_set, y_set)
                
                y_preds = base_estimator.predict(X_set)
                
                if callable(model_score):
                    score = model_score(y_set, y_preds)
                else:
                    score = r2_score(y_set, y_preds)
                    
                    
                if ((score >= good_enough_score)
                        or (best_model is None)
                        or (score < best_score)
                        or ((score == best_score) and (set_size > best_size))):
                    best_model = model
                    best_score = score
                    best_size = set_size
                    
                    best_X = X_set
                    best_y = y_set
                    
                if (score >= good_enough_score):
                    break
                
            n_iterate += 1
        
        if best_model is None:
            raise ValueError('RANSAC could not find a valid consensus set.')
            
        self.estimator_ = best_model
        self.inliers_ = (best_X, best_y)
        return self
    
    def predict(self, X):
        return self.estimator_.predict(X)
    
    def score(self, X, y):
        if hasattr(self.estimator_, 'score'):
            return self.estimator_.score(X, y)
        else:
            y_preds = self.predict(X)
            return r2_score(y, y_preds)

In [3]:
from sklearn.datasets import make_regression
X, y = make_regression(n_samples=2000, n_features=10, noise=4.0, random_state=42)
reg = RANSAC(fit_ratio=0.005)
reg.fit(X, y)
reg.score(X, y)

0.9995963528827464