In [1]:
from sklearn.metrics import accuracy_score
from sklearn.svm import SVC
from sklearn.datasets import make_classification
import pandas as pd
import numpy as np
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import matplotlib.image as mpimg
import matplotlib.pyplot as plt
import glob
import io
import numpy as np
import numpy as np
import pandas as pd
import statsmodels.api as sm
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score, recall_score, precision_score
from sklearn.utils import shuffle
from sklearn.kernel_approximation import RBFSampler
from sklearn.kernel_approximation import PolynomialCountSketch
from sklearn.kernel_approximation import AdditiveChi2Sampler


In [2]:
def remove_correlated_features(X):
    corr_threshold = 0.9
    corr = X.corr()
    drop_columns = np.full(corr.shape[0], False, dtype=bool)
    for i in range(corr.shape[0]):
        for j in range(i + 1, corr.shape[0]):
            if corr.iloc[i, j] >= corr_threshold:
                drop_columns[j] = True
    columns_dropped = X.columns[drop_columns]
    X.drop(columns_dropped, axis=1, inplace=True)
    return columns_dropped


def remove_less_significant_features(X, Y):
    sl = 0.05
    regression_ols = None
    columns_dropped = np.array([])
    for itr in range(0, len(X.columns)):
        regression_ols = sm.OLS(Y, X).fit()
        max_col = regression_ols.pvalues.idxmax()
        max_val = regression_ols.pvalues.max()
        if max_val > sl:
            X.drop(max_col, axis='columns', inplace=True)
            columns_dropped = np.append(columns_dropped, [max_col])
        else:
            break
          
    regression_ols.summary()
    return columns_dropped

def split_normolize_data(kernel=RBFSampler(gamma=1, random_state=1)):
    data = pd.read_csv('data.csv')
    data.drop(data.columns[[-1, 0]], axis=1, inplace=True)
    print("applying feature engineering...")
    diag_map = {'M': 1.0, 'B': -1.0}
    data['diagnosis'] = data['diagnosis'].map(diag_map)
    Y = data['diagnosis']
    X = data.iloc[:,1 :]
    print("splitting dataset into train and test sets...")
    remove_correlated_features(X)
    remove_less_significant_features(X, Y)
    # normalize data for better convergence and to prevent overflow
    X_normalized = MinMaxScaler().fit_transform(X.values)
    X = pd.DataFrame(X_normalized)
    # insert 1 in every row for intercept b
    X.insert(loc=len(X.columns), column='intercept', value=1)
    X = kernel.fit_transform(X)
    X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=0)
    return X_train, X_test, y_train, y_test

In [3]:
# >> FEATURE SELECTION << #
class SVC_hand:
    
    def __init__(self,regularization_strength = 10000,learning_rate = 0.000001,kernel=RBFSampler(gamma=1, random_state=1)):
        print('kernel=',kernel)
        self.regularization_strength=regularization_strength
        self.learning_rate=learning_rate
        self.kernel=kernel
    
    def fit(self,X_train, X_test, y_train, y_test):
        #X_train, X_test, y_train, y_test=self.split_normolize_data(data)
        self.X_train=X_train
        self.y_train=y_train
        self.X_test=X_test
        self.y_test=y_test

    ##############################


    # >> MODEL TRAINING << #
    def compute_cost(self,W, X, Y):
        # calculate hinge loss
        N = X.shape[0]
        distances = 1 - Y * (np.dot(X, W))
        distances[distances < 0] = 0 
        hinge_loss = self.regularization_strength * (np.sum(distances) / N)

        # calculate cost
        cost = 1 / 2 * np.dot(W, W) + hinge_loss
        return cost


    def calculate_cost_gradient(self,W, X_batch, Y_batch):
       
        if type(Y_batch) == np.float64:
            Y_batch = np.array([Y_batch])
            X_batch = np.array([X_batch])  

        distance = 1 - (Y_batch * np.dot(X_batch, W))
        dw = np.zeros(len(W))

        for ind, d in enumerate(distance):
            if max(0, d) == 0:
                di = W
            else:
                di = W - (self.regularization_strength * Y_batch[ind] * X_batch[ind])
            dw += di

        dw = dw/len(Y_batch)  # average
        return dw


    def sgd(self):
        #features=self.X_train.to_numpy()
        features=self.X_train
        #outputs=self.y_train
        outputs=self.y_train.to_numpy()
        max_epochs = 5000
        weights = np.zeros(features.shape[1])
        nth = 0
        prev_cost = float("inf")
        cost_threshold = 0.01  # in percent
        # stochastic gradient descent
        for epoch in range(1, max_epochs):
            X, Y = shuffle(features, outputs)
            for ind, x in enumerate(X):
               # print(Y[ind])
                ascent = self.calculate_cost_gradient(weights, x, Y[ind])
                weights = weights - (self.learning_rate * ascent)

            # convergence check on 2^nth epoch
            if epoch == 2 ** nth or epoch == max_epochs - 1:
                cost = self.compute_cost(weights, features, outputs)
                print("Epoch is: {} and Cost is: {}".format(epoch, cost))
                # stoppage criterion
                if abs(prev_cost - cost) < cost_threshold * prev_cost:
                    return weights
                prev_cost = cost
                nth += 1
        return weights


    ########################


    def init(self):
        print("training started...")
        W = self.sgd()
        print("training finished.")
        print("weights are: {}".format(W))

        # testing the model
        print("testing the model...")
        y_train_predicted = np.array([])
        for i in range(self.X_train.shape[0]):
            yp = np.sign(np.dot(self.X_train[i], W))
            y_train_predicted = np.append(y_train_predicted, yp)

        y_test_predicted = np.array([])
        for i in range(self.X_test.shape[0]):
            yp = np.sign(np.dot(self.X_test[i], W))
            y_test_predicted = np.append(y_test_predicted, yp)
        return self.y_test,y_test_predicted

        


    # set hyper-parameters and call init

In [4]:
#X,Y=data_normolize(data)
kernels=[PolynomialCountSketch(),AdditiveChi2Sampler(),RBFSampler()]
for kernel in kernels:
    X_train, X_test, y_train, y_test=split_normolize_data(kernel)
    svc=SVC_hand(kernel=kernel)
    #ps = PolynomialCountSketch(degree=3, random_state=1)
    svc.fit(X_train, X_test, y_train, y_test)
    y_test,y_test_predicted=svc.init()
    print("accuracy on test dataset: {}".format(accuracy_score(y_test, y_test_predicted)))
    print("recall on test dataset: {}".format(recall_score(y_test, y_test_predicted)))
    print("precision on test dataset: {}".format(recall_score(y_test, y_test_predicted)))

applying feature engineering...
splitting dataset into train and test sets...
kernel= PolynomialCountSketch()
training started...
Epoch is: 1 and Cost is: 6117.00472651649
Epoch is: 2 and Cost is: 4686.255137898888
Epoch is: 4 and Cost is: 3404.85507601705
Epoch is: 8 and Cost is: 2543.9814780456977
Epoch is: 16 and Cost is: 1919.0505549096847
Epoch is: 32 and Cost is: 1583.1062440642409
Epoch is: 64 and Cost is: 1309.3672873850132
Epoch is: 128 and Cost is: 1143.837960353479
Epoch is: 256 and Cost is: 999.2443949232363
Epoch is: 512 and Cost is: 920.0717172370312
Epoch is: 1024 and Cost is: 883.8024162762517
Epoch is: 2048 and Cost is: 861.5759992709595
Epoch is: 4096 and Cost is: 854.496211499569
training finished.
weights are: [-2.77423921e+00 -6.29527933e-01 -5.56911758e-01  2.32233046e+00
  4.70077438e+00 -2.08618833e+00  1.24718036e+00  3.31316441e+00
 -2.33637016e-15  7.55679809e-01 -1.21364301e-01 -1.03937273e+00
 -3.59151431e-01 -8.72675584e-15 -7.38572695e-01 -1.39890166e-16


In [5]:
data = pd.read_csv('data.csv')
data.drop(data.columns[[-1, 0]], axis=1, inplace=True)
print("applying feature engineering...")
diag_map = {'M': 1.0, 'B': -1.0}
data['diagnosis'] = data['diagnosis'].map(diag_map)
Y = data['diagnosis']
X = data.iloc[:,1 :]
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=0)
def test_different_kernel():
    kernels=['rbf','linear','sigmoid']
    y_pred=[]
    for kernel in kernels:
        print(1)
        model=SVC(kernel=kernel,degree=2,gamma=20)
        model.fit(X_train, y_train)
        y_pred.append(model.predict(X_test))
        #print("Точность модели c kernel {}: {}".format(kernel,accuracy_score(y_test, y_pred)))
    return y_pred,kernels     

applying feature engineering...


In [6]:
pred,kernels=test_different_kernel()
for i in range(len(pred)):
    print("Точность модели c kernel {}: {}".format({kernels[i]},accuracy_score(y_test, pred[i])))
print("precomputed kernel работает только с квадратными матрицами")
print("poly kernel при степени=2 дает точность 0.8535(по умолчанию степень равна 3) ")
print("при gamma=20 rbf дает точность= 0.878787878, poly дает точность=0.92424(по умолчанию gamma= scale) ")

1
1
1
Точность модели c kernel {'rbf'}: 0.5877192982456141
Точность модели c kernel {'linear'}: 0.956140350877193
Точность модели c kernel {'sigmoid'}: 0.5877192982456141
precomputed kernel работает только с квадратными матрицами
poly kernel при степени=2 дает точность 0.8535(по умолчанию степень равна 3) 
при gamma=20 rbf дает точность= 0.878787878, poly дает точность=0.92424(по умолчанию gamma= scale) 
