In [2]:
import numpy as np
import pandas as pd # reading data from csv
import statsmodels.api as sm # finding the p-value
from scipy.stats import pearsonr # correlation and p-value
from sklearn.preprocessing import MinMaxScaler # normalization
from sklearn.model_selection import train_test_split # splitting dataframes
from sklearn.metrics import accuracy_score, recall_score, precision_score # result estimation
from sklearn.model_selection import ShuffleSplit
from sklearn import svm # comparing to sklearn SVM


from scipy.optimize import Bounds, BFGS
from scipy.optimize import LinearConstraint, minimize

In [31]:
def Lagrange_dual(c, X, y):
    significant_ind = np.where(c > 1e-7)[0]
    f = 0
    for i in significant_ind:
        for j in significant_ind:
            f -= y[i] * y[j] * c[i] * c[j] * np.dot(X[i], X[j])
    f = sum(c) + f / 2
    return f

In [44]:
class SVM:
    def fit(self, X_train, y_train, lambda_=0.1, learning_rate=0.1, print_info=False):
        self.X_train = X_train
        self.y_train = y_train

        self.samples = self.X_train.shape[0]
        self.features = self.X_train.shape[1]

        self.w = np.zeros(self.features).reshape((self.features, 1))
        self.lambda_ = lambda_
        self.learning_rate = learning_rate
        
        self.sgd(print_info)
        # self.optimize_dual_with_w()
        return self.w

    # MODEL TRAINING
    def compute_cost(self, X, y):
        hinge_loss = 0
        for i in range(len(y)):
            hinge_loss += max(0, 1 - y[i] * (X[i] @ self.w))
        hinge_loss /= self.features

        # print(hinge_loss, hinge_loss_)

        return 0.5 * self.lambda_ * (self.w.T @ self.w) + hinge_loss


    def calculate_cost_gradient(self, X_batch, y_batch):
        if type(y_batch) is type(np.float64()):
            y_batch = np.array([y_batch]).reshape((1, 1))
            X_batch = np.array([X_batch]).reshape((self.features, 1))
            # print("reshaped batches shapes:", X_batch.shape, y_batch.shape)

        distances = 1 - y_batch * (X_batch.T @ self.w)
        dw = np.zeros(self.features).reshape((self.features, 1))

        v = X_batch @ y_batch
        for i in range(len(distances)):
            dw += self.lambda_ * self.w - ((distances[i] > 0) * v).reshape((self.features, 1))

        dw /= self.features
        return dw
    
        
    def sgd(self, print_info=False):
        iterations = 1000
        cost_threshold_multiplier = 0.01

        cur_pow = 0
        prev_cost = 0
        for iteration in range(iterations):
            delta = 0
            for i in np.random.permutation(self.X_train.shape[0]):
                delta = self.calculate_cost_gradient(self.X_train[i].T, self.y_train[i])
                self.w = self.w - self.learning_rate * delta
                
            if print_info and (iteration == 2 ** cur_pow or iteration == iterations - 1):
                cost = self.compute_cost(self.X_train, self.y_train)
                print("iteration no. %.6f, cost = %.6f, delta[0] = %.6f, w[0] = %.6f" % 
                    (iteration, cost, self.learning_rate * delta[0], self.w[0]))

                if abs(cost - prev_cost) < cost_threshold_multiplier * prev_cost:
                    break
                cur_pow += 1
                prev_cost = cost
    

    def optimize_dual_coeffs(self):
        c_0 = np.random.rand(self.samples) / (2 * self.features * self.lambda_)

        linear_constraint = LinearConstraint(y, [0], [0])
        bounds_for_c = Bounds(np.zeros(self.samples), np.full(self.samples, 1 / (2 * self.features * self.lambda_)))

        optimized = minimize(Lagrange_dual, c_0, args=(self.X_train, self.y_train), method='trust-constr',
            hess=BFGS(), constraints=[linear_constraint], bounds=bounds_for_c)
        print(1)
        self.c = optimized.x
        return self.c
    
    def optimize_dual_with_w(self):
        self.optimize_dual_coeffs()
        self.w = self.X_train @ (self.c * self.y_train)
        return self.w

            
    def test_classifier(self, X_test, y_test):
        y_test_predicted = np.sign(X_test @ self.w)

        print("accuracy on test dataset: %.6f" % accuracy_score(y_test, y_test_predicted))
        print("recall on test dataset: %.6f" % recall_score(y_test, y_test_predicted))
        print("precision on test dataset: %.6f" % precision_score(y_test, y_test_predicted))

    def score(self, X_test, y_test):
        y_test_predicted = np.sign(X_test @ self.w)
        return accuracy_score(y_test, y_test_predicted)
        

In [45]:
# REMOVE REDUNDANT FEATURES

def remove_correlated_features(X_df, correlation_threshold):
    features = X_df.shape[1]
    # correlations between X_df columns (features)
    correlations = X_df.corr().abs().to_numpy()
    columns_dropped = np.zeros(features)

    for i in range(features):
        for j in range(i + 1, features):
            if correlations[i, j] >= correlation_threshold:
                columns_dropped[i] = 1
                break
    
    features_dropped = X_df.columns[columns_dropped]
    X_df.drop(columns=features_dropped, inplace=True)


def remove_less_significant_features(X_df, y_df):
    sl = 0.05
    regression_ols = None
    columns_dropped = np.array([])
    for itr in range(0, len(X_df.columns)):
        regression_ols = sm.OLS(y_df, X_df).fit()
        max_col = regression_ols.pvalues.idxmax()
        max_val = regression_ols.pvalues.max()
        if max_val > sl:
            X_df.drop(max_col, axis='columns', inplace=True)
            columns_dropped = np.append(columns_dropped, [max_col])
        else:
            break
    regression_ols.summary()
    return columns_dropped

def remove_correlated_and_insignificant_features(X_df, correlation_threshold, p_value_threshold):
    features = X_df.shape[1]

    columns_dropped = np.full(features, False, dtype=bool)

    for i in range(features):
        for j in range(i + 1, features):
            corr, p_value = pearsonr(X_df.iloc[:, i], X_df.iloc[:, j])
            if corr >= correlation_threshold or p_value >= p_value_threshold:
                columns_dropped[i] = True
    
    print(sum(columns_dropped))
    features_dropped = X_df.columns[columns_dropped]
    X_df.drop(columns=features_dropped, inplace=True)


In [46]:

data = pd.read_csv('./data.csv') 

diagnosis_map = {'M':1, 'B':-1}
data['diagnosis'] = data['diagnosis'].map(diagnosis_map)

data.drop(data.columns[[-1, 0]], axis=1, inplace=True)

X_df_ = data.iloc[:, 1:].dropna() # features
y_df = data.loc[:, 'diagnosis'].dropna() # labels

# remove_correlated_features(X_df_, 0.95)
# remove_less_significant_features(X_df_, y_df)
remove_correlated_and_insignificant_features(X_df_, 0.95, 0.1)

X_df = pd.DataFrame(MinMaxScaler().fit_transform(X_df_.values)) # normalized features
X_df.insert(loc=X_df.shape[1], column='for w0', value=1)

X = X_df.to_numpy(dtype=np.float64())
y = y_df.to_numpy(dtype=np.float64())

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
        
clf = SVM()
# w = clf.fit(X_train, y_train, lambda_=0.0001, learning_rate=0.01, print_info=False)
# clf.test_classifier(X_test, y_test)

16


In [47]:
# from sklearn import svm
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# clf_sklearn_svm = svm.SVC(kernel="linear", C=1).fit(X_train, y_train)
# clf_sklearn_svm.score(X_test, y_test)

def cross_validation(clf, X, y, n_splits=5, test_size=0.2, custom_classifier=True):
    cv = ShuffleSplit(n_splits=n_splits, test_size=test_size, random_state=4)
    
    avg_score = 0
    for train_indices, test_indices in cv.split(X):
        X_train, y_train = X[train_indices, :], y[train_indices]
        X_test, y_test = X[test_indices, :], y[test_indices]

        if custom_classifier:
            clf.fit(X_train, y_train, lambda_=0.0001, learning_rate=0.01)
        else:
            clf.fit(X_train, y_train)
        cur_score = clf.score(X_test, y_test)
        print(cur_score)
        avg_score += cur_score

    avg_score /= n_splits
    print("average score:", avg_score)
    return avg_score

cross_validation(clf, X, y, n_splits=5, test_size=0.2, custom_classifier=True)

0.9736842105263158
0.9649122807017544
0.9473684210526315
0.9385964912280702
0.956140350877193
average score: 0.956140350877193


0.956140350877193

In [260]:

clf_sklearn_svm = svm.SVC(kernel="linear", C=1).fit(X_train, y_train)
cross_validation(clf_sklearn_svm, X, y, n_splits=5, test_size=0.2, custom_classifier=False)

0.9912280701754386
0.9649122807017544
0.9473684210526315
0.9385964912280702
0.956140350877193
average score: 0.9596491228070174


0.9596491228070174