# Machine Learning Algorithms - Linear Models

## Imports and Config

In [2]:
import numpy as np
import pandas as pd
import random
import sklearn.linear_model

print("Imports Done!")

Imports Done!


In [3]:
class Config:
    random_seed = 42
    train_size = 0.75

print(Config.random_seed)
print(Config.train_size)

42
0.75


## Regression - Linear Regression

In [4]:
from sklearn.datasets import load_diabetes

data = load_diabetes(as_frame=True)
X, y = data['data'], data['target']

In [4]:
X

Unnamed: 0,age,sex,bmi,bp,s1,s2,s3,s4,s5,s6
0,0.038076,0.050680,0.061696,0.021872,-0.044223,-0.034821,-0.043401,-0.002592,0.019907,-0.017646
1,-0.001882,-0.044642,-0.051474,-0.026328,-0.008449,-0.019163,0.074412,-0.039493,-0.068332,-0.092204
2,0.085299,0.050680,0.044451,-0.005670,-0.045599,-0.034194,-0.032356,-0.002592,0.002861,-0.025930
3,-0.089063,-0.044642,-0.011595,-0.036656,0.012191,0.024991,-0.036038,0.034309,0.022688,-0.009362
4,0.005383,-0.044642,-0.036385,0.021872,0.003935,0.015596,0.008142,-0.002592,-0.031988,-0.046641
...,...,...,...,...,...,...,...,...,...,...
437,0.041708,0.050680,0.019662,0.059744,-0.005697,-0.002566,-0.028674,-0.002592,0.031193,0.007207
438,-0.005515,0.050680,-0.015906,-0.067642,0.049341,0.079165,-0.028674,0.034309,-0.018114,0.044485
439,0.041708,0.050680,-0.015906,0.017293,-0.037344,-0.013840,-0.024993,-0.011080,-0.046883,0.015491
440,-0.045472,-0.044642,0.039062,0.001215,0.016318,0.015283,-0.028674,0.026560,0.044529,-0.025930


In [6]:
class MyLineReg:

    def __init__(self, sgd_sample=None, metric=None, reg=None, l1_coef=0, l2_coef=0, random_state=42, n_iter=100, learning_rate=0.1):
        self.n_iter = n_iter
        self.learning_rate = learning_rate
        self.weights = []
        self.metric = metric
        self.reg = reg
        self.l1_coef = l1_coef
        self.l2_coef = l2_coef
        self.sgd_sample = sgd_sample
        self.random_state = random_state

    def __repr__(self):
        return f'MyLineReg class: n_iter={n_iter}, learning_rate={learning_rate}'

    def compute_loss(self, weights, y, y_pred, reg, l1_coef, l2_coef):
        loss = np.mean((y - y_pred)**2)
        if reg == "l1":
            return loss + l1_coef * np.sum(abs(weights))
        elif reg == "l2":
            return loss + l2_coef * np.sum(weights**2)
        elif reg == "elasticnet":
            return loss + l1_coef * np.sum(abs(weights)) + l2_coef * np.sum(weights**2)
        return loss

    def compute_weights(self, weights, learning_rate, X, y, y_pred, n_samples, num_iter, reg, l1_coef, l2_coef):
        if callable(self.learning_rate):
            if reg == None:
                return weights - learning_rate(num_iter) * (2 / n_samples) * np.dot(X.T, y_pred - y)
            elif reg == "l1":
                return weights - learning_rate(num_iter) * ((2 / n_samples) * np.dot(X.T, y_pred - y) + l1_coef * np.sign(weights))
            elif reg == "l2":
                return weights - learning_rate(num_iter) * ((2 / n_samples) * np.dot(X.T, y_pred - y) + l2_coef * 2 * weights)
            elif reg == "elasticnet":
                return weights - learning_rate(num_iter) * ((2 / n_samples) * np.dot(X.T, y_pred - y) + l1_coef * np.sign(weights) + l2_coef * 2 * weights)
        else:
            if reg == None:
                return weights - learning_rate * (2 / n_samples) * np.dot(X.T, y_pred - y)
            elif reg == "l1":
                return weights - learning_rate * ((2 / n_samples) * np.dot(X.T, y_pred - y) + l1_coef * np.sign(weights))
            elif reg == "l2":
                return weights - learning_rate * ((2 / n_samples) * np.dot(X.T, y_pred - y) + l2_coef * 2 * weights)
            elif reg == "elasticnet":
                return weights - learning_rate * ((2 / n_samples) * np.dot(X.T, y_pred - y) + l1_coef * np.sign(weights) + l2_coef * 2 * weights)
    
    def compute_metric(self, metric_name, y, y_pred):
        if metric_name == None:
            pass
        elif metric_name == "mse":
            return np.mean((y - y_pred)**2)
        elif metric_name == "mae":
            return np.mean(abs(y - y_pred))
        elif metric_name == "rmse":
            return np.sqrt(np.mean((y - y_pred)**2))
        elif metric_name == "mape":
            return 100 * np.mean(abs((y - y_pred) / y))
        elif metric_name == "r2": 
            return 1 - np.mean((y - y_pred)**2)/np.mean((y - np.mean(y))**2)

    def get_coef(self):
        return self.weights[1:]
        
    def get_best_score(self):
        return self.compute_metric(self.metric, y, np.dot(np.hstack((np.ones((X.shape[0], 1)), X)), self.weights))

    def predict(self, X):
        X = np.hstack((np.ones((X.shape[0], 1)), X))
        return np.dot(X, self.weights)

    def fit(self, X, y, verbose=False):
        n_samples, n_features = X.shape
        X = np.hstack((np.ones((n_samples, 1)), X))
        self.weights = np.ones(n_features + 1)    

        random.seed(self.random_state)
     
        for num_iter in range(1, self.n_iter + 1):

            if isinstance(self.sgd_sample, int):
                batch_size = self.sgd_sample
            elif isinstance(self.sgd_sample, float):
                batch_size = int(n_samples * self.sgd_sample)
            else:
                batch_size = n_samples
                
            sample_rows_idx = random.sample(range(X.shape[0]), batch_size)
                
            y_pred = np.dot(X, self.weights)
            loss = self.compute_loss(self.weights, y, y_pred, self.reg, self.l1_coef, self.l2_coef)
            self.weights = self.compute_weights(self.weights, self.learning_rate, X[sample_rows_idx], y.iloc[sample_rows_idx], y_pred[sample_rows_idx], batch_size, num_iter, self.reg, self.l1_coef, self.l2_coef)
                    
            if verbose and self.metric:
                if num_iter == 1:
                    print(f"start | loss: {loss} | {self.metric}: {self.compute_metric(self.metric, y, y_pred)}")
                if num_iter % verbose == 0:
                    print(f"{self.n_iter} | loss: {loss} | {self.metric}: {self.compute_metric(self.metric, y, y_pred)}")

In [7]:
my_linreg_model_test = MyLineReg(sgd_sample=0.1)
my_linreg_model_test.fit(X, y, verbose=100)
my_linreg_model_test.get_coef()

array([ 13.56526458,   1.54787402,  41.77485153,  30.25568854,
        14.34384202,  11.95453304, -26.43872364,  30.17356937,
        39.85424619,  25.47414328])

In [8]:
my_linreg_model = MyLineReg(metric="mae", learning_rate=lambda iter: 0.5 * (0.85 ** iter))
my_linreg_model.fit(X, y, verbose=100)
my_linreg_model.get_coef()

start | loss: 28752.020614660887 | mae: 151.13348416289594
100 | loss: 5700.346616390239 | mae: 64.52760766167967


array([ 4.75147554,  1.78183055, 12.93809891,  9.95537628,  5.19244181,
        4.40902338, -7.00724769,  9.6628404 , 12.47804144,  8.71161724])

In [9]:
my_linreg_model_2 = MyLineReg(metric="r2", n_iter=10**5, learning_rate=0.999)
my_linreg_model_2.fit(X, y)
my_linreg_model_2.get_best_score()

0.5177467808833367

In [10]:
from sklearn.linear_model import LinearRegression

In [11]:
model_lr = LinearRegression()
model_lr.fit(X, y)

model_lr.score(X, y)

0.5177484222203499

In [12]:
my_linreg_model_2.get_best_score() - model_lr.score(X, y)

-1.6413370131918015e-06

---

Красивый код от коллеги, надо посмотреть и научиться

In [None]:
# class MyLineReg:
#     def __init__(self, n_iter: int = 100, learning_rate: float = 0.1, metric: str = None) -> None:
#         self.n_iter = n_iter
#         self.learning_rate = learning_rate
#         self._weights = None
#         self.metric = metric

#     def __str__(self) -> str:
#         params = [f'{key}={value}' for key, value in self.__dict__.items()]
#         return 'MyLineReg class: ' + ', '.join(params)

#     @staticmethod
#     def _mae(y_true: np.array, y_pred: np.array):
#         return (y_true - y_pred).abs().mean()

#     @staticmethod
#     def _mse(y_true: np.array, y_pred: np.array):
#         return (y_true - y_pred).pow(2).mean()

#     @staticmethod
#     def _rmse(y_true: np.array, y_pred: np.array):
#         return np.sqrt((y_true - y_pred).pow(2).mean())

#     @staticmethod
#     def _mape(y_true: np.array, y_pred: np.array):
#         return 100 * ((y_true - y_pred) / y_true).abs().mean()

#     @staticmethod
#     def _r2(y_true: np.array, y_pred: np.array):
#         return 1 - (y_true - y_pred).pow(2).sum() / ((y_true - y_true.mean()).pow(2).sum())

#     def get_best_score(self):
#         return self.score

#     def fit(self, x: pd.DataFrame, y: pd.Series, verbose: int) -> None:
#         x = pd.concat([pd.Series([1] * x.shape[0], index=x.index), x], axis=1).values
#         self._weights = pd.Series([1.] * x.shape[1]).values
#         for i in range(self.n_iter):
#             y_hat = self._weights @ x.T
#             mse = (y_hat - y).pow(2).mean()
#             grad = (2 / y.shape[0]) * (y_hat - y) @ x
#             self._weights -= self.learning_rate * grad
#             if self.metric:
#                 self.score = getattr(self, '_' + self.metric)(y, x @ self._weights)
#             if verbose and i % verbose == 0:
#                 if self.metric:
#                     print(f'{i} | loss: {mse} | {self.metric}: {self.score}')
#                 else:
#                     print(f'{i} | loss: {mse}')

#     def get_coef(self):
#         return self._weights[1:]

#     def predict(self, x: pd.DataFrame):
#         x = pd.concat([pd.Series([1] * x.shape[0], index=x.index), x], axis=1).values
#         return x @ self._weights

---

## Classification - Logistic Regression

In [5]:
from sklearn.datasets import make_classification

X, y = make_classification(n_samples=1000, n_features=14, n_informative=10, random_state=42)
X = pd.DataFrame(X)
y = pd.Series(y)
X.columns = [f'col_{col}' for col in X.columns]

In [29]:
class MyLogReg:

    def __init__(self, metric=None, n_iter=10, learning_rate=0.1, eps=1e-15, reg=None, l1_coef=0, l2_coef=0, sgd_sample=None, random_state=42):
        self.n_iter = n_iter
        self.learning_rate = learning_rate
        self.eps = eps
        self.weights = None
        self.metric = metric
        self.reg = reg
        self.l1_coef = l1_coef
        self.l2_coef = l2_coef
        self.sgd_sample = sgd_sample
        self.random_state = random_state        

    def __repr__(self):
        return f'MyLogReg class: n_iter={self.n_iter}, learning_rate={self.learning_rate}'
    
    def compute_loss(self, weights, y, y_pred, reg, l1_coef, l2_coef):
        loss_result = -1 * np.mean(y * np.log(y_pred + self.eps) + (1 - y) * np.log(1 - y_pred + self.eps))
        if reg == None:
            return loss_result
        elif reg == "l1":
            return loss_result + l1_coef * np.sum(abs(weights))
        elif reg == "l2":
            return loss_result + l2_coef * np.sum(weights**2)
        elif reg == "elasticnet":
            return loss_result + l1_coef * np.sum(abs(weights)) + l2_coef * np.sum(weights**2)

    def compute_weights(self, weights, learning_rate, X, y, y_pred, n_samples, num_iter, reg, l1_coef, l2_coef):
        gradient = (1 / n_samples) * np.dot(X.T, y_pred - y)
        if callable(learning_rate):
            if reg == None:
                return weights - learning_rate(num_iter) * gradient
            elif reg == "l1":
                return weights - learning_rate(num_iter) * (gradient + l1_coef * np.sign(weights))
            elif reg == "l2":
                return weights - learning_rate(num_iter) * (gradient + l2_coef * 2 * weights)
            elif reg == "elasticnet":
                return weights - learning_rate(num_iter) * (gradient + l1_coef * np.sign(weights) + l2_coef * 2 * weights)
        else:
            if reg == None:
                return weights - learning_rate * gradient
            elif reg == "l1":
                return weights - learning_rate * (gradient + l1_coef * np.sign(weights))
            elif reg == "l2":
                return weights - learning_rate * (gradient + l2_coef * 2 * weights)
            elif reg == "elasticnet":
                return weights - learning_rate * (gradient + l1_coef * np.sign(weights) + l2_coef * 2 * weights)

    def compute_metric(self, metric_name, y, y_pred, y_pred_proba):
        item_index = np.where(y_pred == 1)
        precision = np.sum(y.loc[item_index] == 1) / len(item_index[0])
        recall = np.sum(y.loc[item_index] == 1) / np.sum(y)
        
        if metric_name == None:
            pass
        elif metric_name == "accuracy":
            return np.sum(y == y_pred) / y.shape[0]
        elif metric_name == "precision":
            return precision
        elif metric_name == "recall":
            return recall
        elif metric_name == "f1":
            return (2 * precision * recall) / (precision + recall)
        elif metric_name == "roc_auc": 
            labeled_probs = pd.DataFrame(data=np.array([y_pred_proba, y]).T, columns=["probability", "label"])
            labeled_probs = labeled_probs.sort_values(by=["probability"], ascending=False)
            labeled_probs["is_duplicated"] = labeled_probs["probability"].duplicated(keep=False)        
            labeled_probs.reset_index(drop=True, inplace=True)
            
            result = 0
            pos_counter = 0
            
            for index, row in labeled_probs.iterrows():
                duplicated_labels = 0
                
                if row.label:
                    pos_counter += 1
                else:
                    if row.is_duplicated:
                        for sub_index, sub_row in labeled_probs.iterrows():
                            if sub_row.probability == row.probability:
                                duplicated_labels += 1
                                if sub_index >= index:
                                    result += pos_counter + duplicated_labels * 0.5
                                    break
                                else:
                                    result += (pos_counter - 1) + duplicated_labels * 0.5
                                    break
                    else:
                        result += pos_counter
            roc_auc = result / (np.sum(y) * (len(y) - np.sum(y)))
            return roc_auc

    def get_best_score(self):
        return self.compute_metric(self.metric, y, np.where((1 / (1 + np.exp(-1*np.dot(np.hstack((np.ones((X.shape[0], 1)), X)), self.weights)))) > 0.5, 1, 0), 1 / (1 + np.exp(-1*np.dot(np.hstack((np.ones((X.shape[0], 1)), X)), self.weights))))
    
    def predict(self, X):
        return self.predict_proba(X) > 0.5

    def predict_proba(self, X):
        X = np.hstack((np.ones((X.shape[0], 1)), X))
        return 1 / (1 + np.exp(-1*np.dot(X, self.weights)))
                                       
    def get_coef(self):
        return self.weights[1:]

    def fit(self, X, y, verbose=False):
        n_samples, n_features = X.shape
        X = np.hstack((np.ones((n_samples, 1)), X))
        self.weights = np.ones(n_features + 1)

        random.seed(self.random_state)
        
        for num_iter in range(1, self.n_iter + 1):

            if isinstance(self.sgd_sample, int):
                batch_size = self.sgd_sample
            elif isinstance(self.sgd_sample, float):
                batch_size = int(n_samples * self.sgd_sample)
            else:
                batch_size = n_samples
                
            sample_rows_idx = random.sample(range(X.shape[0]), batch_size)
            
            y_pred = 1 / (1 + np.exp(-1*np.dot(X, self.weights)))
            loss = self.compute_loss(self.weights, y, y_pred, self.reg, self.l1_coef, self.l2_coef)
            self.weights = self.compute_weights(self.weights, self.learning_rate, X[sample_rows_idx], y.iloc[sample_rows_idx], y_pred[sample_rows_idx], batch_size, num_iter, self.reg, self.l1_coef, self.l2_coef)
           
            if verbose and self.metric:
                if num_iter == 1:
                    print(f"start | loss: {loss} | {self.metric}: {self.compute_metric(self.metric, y, np.where(y_pred > 0.5, 1, 0), y_pred)}")
                if num_iter % verbose == 0:
                    print(f"{self.n_iter} | loss: {loss} | {self.metric}: {self.compute_metric(self.metric, y, np.where(y_pred > 0.5, 1, 0), y_pred)}")


In [30]:
my_logreg_model = MyLogReg()
my_logreg_model.fit(X, y, verbose=1)

In [31]:
my_logreg_model_2 = MyLogReg(n_iter=100, learning_rate=0.1, metric="roc_auc")
my_logreg_model_2.fit(X, y, verbose=10)
my_logreg_model_2.get_best_score()

start | loss: 3.6736886876615524 | roc_auc: 0.5326141304565218
100 | loss: 1.8729518266222966 | roc_auc: 0.6106664426657706
100 | loss: 1.1626945313705064 | roc_auc: 0.7198908795635183
100 | loss: 0.8288847284427477 | roc_auc: 0.7991111964447858
100 | loss: 0.6548845902226954 | roc_auc: 0.8443873775495102
100 | loss: 0.5606818660628208 | roc_auc: 0.8702874811499246
100 | loss: 0.5054955690290941 | roc_auc: 0.8853355413421654
100 | loss: 0.4701354545266514 | roc_auc: 0.8947515790063161
100 | loss: 0.4456357293312561 | roc_auc: 0.9010436041744166
100 | loss: 0.42761214161843997 | roc_auc: 0.9054516218064872
100 | loss: 0.41378040631663504 | roc_auc: 0.9086556346225385


0.9089076356305426

In [35]:
y_pred_proba = [0.91, 0.86, 0.78, 0.6, 0.6, 0.55, 0.51, 0.46, 0.45, 0.45, 0.42]
y_pred = [1, 0, 0, 1, 0, 1, 0, 0, 0, 1, 0]

labeled_probs = pd.DataFrame(data=np.array([y_pred_proba, y_pred]).T, columns=["probability", "label"])
labeled_probs = labeled_probs.sort_values(by=["probability"], ascending=False)
labeled_probs["is_duplicated"] = labeled_probs["probability"].duplicated(keep=False)        
labeled_probs.reset_index(drop=True, inplace=True)
            
# print(labeled_probs)

result = 0
pos_counter = 0
            
for index, row in labeled_probs.iterrows():
    duplicated_labels = 0
                
    if row.label:
        pos_counter += 1
    else:
        if row.is_duplicated:
            for sub_index, sub_row in labeled_probs.iterrows():
                if sub_row.probability == row.probability:
                    duplicated_labels += 1
                    if sub_index >= index:
                        result += pos_counter + duplicated_labels * 0.5
                        break
                    else:
                        result += (pos_counter - 1) + duplicated_labels * 0.5
                        break
        else:
            result += pos_counter

print(result)
print(result / (np.sum(y_pred) * (len(y_pred) - np.sum(y_pred))))

17.0
0.6071428571428571


In [16]:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score

In [17]:
model_logreg = LogisticRegression()
model_logreg.fit(X, y)

model_logreg.score(X, y)
y_pred = model_logreg.predict(X)

In [18]:
roc_auc_score(y, y_pred)

0.8519714078856315

### Classification - Support Vector Machine

In [None]:
from sklearn.datasets import make_classification

X, y = make_classification(n_samples=1000, n_features=14, n_informative=10, random_state=42)
X = pd.DataFrame(X)
y = pd.Series(y)
X.columns = [f'col_{col}' for col in X.columns]

In [36]:
class MySVM:

    def __init__(self, n_iter=10, learning_rate=0.001, weights=None, b=None, C=1):
        self.n_iter = n_iter
        self.learning_rate = learning_rate
        self.weights = weights
        self.b = b
        self.C = C

    def __repr__(self):
        return f'MySVM class: n_iter={self.n_iter}, learning_rate={self.learning_rate}'

    def get_coef(self):
        return (self.weights.tolist(), self.b)
    
    def predict(self, X):
        y_pred = np.sign(np.dot(self.weights, X.T) + self.b)
        y_pred[y_pred == -1] = 0
        return y_pred.astype(int)
    
    def fit(self, X, y, verbose=False):
        y = y.replace({0: -1})
        n_samples, n_features = X.shape
        self.weights = np.ones(n_features)
        self.b = 1

        for num_iter in range(1, self.n_iter + 1):
            for index, row in X.iterrows(): 
                if y[index] * (np.dot(self.weights, row) + self.b) >= 1:
                    grad_weights = 2 * self.weights
                    grad_b = 0
                else:
                    grad_weights = 2 * self.weights - self.C * y[index] * row
                    grad_b = - self.C * y[index]
                    
                self.weights -= self.learning_rate * grad_weights
                self.b -= self.learning_rate * grad_b
    
                svm_loss = np.linalg.norm(self.weights)**2 + self.C * (1 / n_samples) * np.sum(np.max([0, 1 - y[index] * (np.dot(self.weights, row) + self.b)]))
            
            if verbose:
                if num_iter == 1:
                    print(f"start | loss: {svm_loss}")
                if num_iter % verbose == 0:
                    print(f"{self.n_iter} | loss: {svm_loss}")

In [38]:
my_svm_model = MySVM()
my_svm_model.fit(X, y)
my_svm_model.predict(X)
my_svm_model.get_coef()

([-0.04494599106455738,
  -0.01483486164366413,
  0.09182867142612355,
  -0.07651575744023459,
  -0.2266593240042232,
  0.05915742350197085,
  -0.10037344311770476,
  0.041232619182610106,
  0.09646053125696591,
  -0.0009606829156346712,
  0.10519204525333291,
  -0.010278356360164517,
  -0.02125913100479772,
  0.15067818851466888],
 0.01999999999999902)