Principal Files
This data challenge contains one dataset of 2000 training sequences. The main files available are the following ones

* Xtr.csv - the training sequences.
* 
* Xte.csv - the test sequences.
* 
* Ytr.csv - the sequence labels of the training sequences indicating bound (1) or not (0).
* 
Each row of Xtr.csv represents a sequence. Xte.csv contains 1000 test sequences, for which you need to predict. Ytr.csv contains the labels corresponding to the training data, in the same format as a submission file.

In [63]:
!pip install cvxopt


You should consider upgrading via the '/opt/conda/bin/python3.7 -m pip install --upgrade pip' command.


In [64]:
import sklearn
from sklearn.preprocessing import OneHotEncoder
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt

import random
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import scale
import optuna
from sklearn.metrics import accuracy_score
from sklearn.metrics import recall_score, precision_score
from numpy import linalg
import cvxopt
import cvxopt.solvers
import sklearn

import os


In [65]:

for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

/kaggle/input/kernel-methods-ammi-2020/Ytr.csv
/kaggle/input/kernel-methods-ammi-2020/Xte.csv
/kaggle/input/kernel-methods-ammi-2020/Xte_mat100.csv
/kaggle/input/kernel-methods-ammi-2020/Xtr_mat100.csv
/kaggle/input/kernel-methods-ammi-2020/Xtr.csv


In [66]:
train_data=pd.read_csv("/kaggle/input/kernel-methods-ammi-2020/Xtr.csv", sep=',',index_col=0)

labels=pd.read_csv("/kaggle/input/kernel-methods-ammi-2020/Ytr.csv", sep=',',index_col=0)


test_data=pd.read_csv("/kaggle/input/kernel-methods-ammi-2020/Xte.csv",sep=',', index_col=0)


#optional data 

train_op_data=pd.read_csv("/kaggle/input/kernel-methods-ammi-2020/Xtr_mat100.csv", sep=' ',header=None).values
test_op_data=pd.read_csv("/kaggle/input/kernel-methods-ammi-2020/Xte_mat100.csv", sep=' ',header=None).values





# Insert bias

In [67]:
# def insert_intercept(train_op_data):
#     N=train_op_data.shape[0]
   
#     a = np.ones((N,1))
    
#     train_op_data=np.append(train_op_data,a, axis=1)
#     return train_op_data

## Change the labels from 0,1 to -1 ,1

In [68]:
labels= np.where(labels==0,-1,1)


# Split the training dataset in train and validation set

In [69]:
X_train_mat100 = train_op_data
# 
X_train, X_val, y_train, y_val = train_test_split(
    X_train_mat100, labels, test_size=0.33, random_state=42)

print(X_train.shape,X_val.shape,y_train.shape, y_val.shape)

(1340, 100) (660, 100) (1340, 1) (660, 1)


# spectrom data

In [72]:

def getKmers(sequence, size=3):
    return [sequence[x:x+size] for x in range(len(sequence) - size + 1)]
def base2int(c):
    return {'A':0,'C':1,'G':2,'T':3}.get(c,0)

def index(kmer):
    base_idx = np.array([base2int(base) for base in kmer])
    multiplier = 4** np.arange(len(kmer))
    kmer_idx = multiplier.dot(base_idx)
    return kmer_idx
    
    
def spectral_embedding(sequence,kmer_size=3):
    kmers = getKmers(sequence,kmer_size)
    kmer_idxs = [index(kmer) for kmer in kmers]
    one_hot_vector = np.zeros(4**kmer_size)
    
    for kmer_idx in kmer_idxs:
        one_hot_vector[kmer_idx] += 1
    return one_hot_vector

train_data['kmers'] = train_data.seq.apply(lambda x:list(spectral_embedding(x,kmer_size=3)))


kmer_data = []
for i in train_data.seq.values:
    kmer_data.append(spectral_embedding(i,kmer_size=6))
    
np.array(kmer_data).shape


(2000, 4096)

## Define kernels

In [74]:

def linear_kernel(x1, x2):
    return np.dot(x1, x2.T)

def polynomial_kernel(x, y, power):
    return (1 + np.dot(x, y.T)) ** power

def gaussian_kernel(x, y, sigma=5.0):
    return np.exp(-linalg.norm(x-y)**2 / (2 * (sigma ** 2)))

def kernelFuncTrigo(x1, x2, i):
   
    sigma = 0.5
    
    kxx = 1 +(np.dot(sin(k*sigma*x1), sin(k*sigma*x2))  + np.dot(cos(k*sigma*x1), cos(k*sigma*x2))  for k in range(1, i+1))

    return kxx

def rbf_kernel(X1, X2, sigma=10):
    '''
    Returns the kernel matrix K(X1_i, X2_j): size (n1, n2)
    where K is the RBF kernel with parameter sigma
    
    Input:
    ------
    X1: an (n1, p) matrix
    X2: an (n2, p) matrix
    sigma: float
    '''
    # For loop with rbf_kernel_element works but is slow in python
    # Use matrix operations!
    
    X2_norm = np.sum(X2 ** 2)
    X1_norm = np.sum(X1 ** 2)
    gamma = 1 / (2 * sigma ** 2)
    K = np.exp(- gamma * (X1_norm+ X2_norm- 2 * np.dot(X1, X2.T)))
    return K

# soft margin svm with SGD from scratch

In [75]:
class SVM:
    def __init__(self, lr,lamb, epoch):
        self.lr=lr
        self.lamb=lamb
        self.epoch=epoch
    def compute_cost(self,W, X, Y):
        # calculate hinge loss
        N = X.shape[0]
        
        distances = 1 - Y * (np.dot(X, W))
        distances[distances < 0] = 0  # equivalent to max(0, distance)
        hinge_loss = self.lamb * (np.sum(distances) / N)

        # calculate cost
        cost = (1 / 2) * (np.dot(W, W)) + hinge_loss
        return cost
    
    def calculate_cost_gradient(self,W, X_batch, Y_batch):
        # if only one example is passed (eg. in case of SGD)
        if type(Y_batch) == np.float64:
            Y_batch = np.array([Y_batch])
            X_batch = np.array([X_batch])  # gives multidimensional array

        distance = 1 - (Y_batch * np.dot(X_batch, W))
        dw = np.zeros(len(W))

        for ind, d in enumerate(distance):
            if max(0, d) == 0:
                di = W
            else:
                di = W -(self.lamb* Y_batch[ind] * X_batch[ind])
            dw += di

        dw = dw/len(Y_batch)  # average
        return dw
    
    def sgd(self,features, outputs):
        
        weights = np.zeros(features.shape[1])
        nth = 0
        prev_cost = float("inf")
        cost_threshold = 1e-3  # in percent
        # stochastic gradient descent
        for epoc in range(1, self.epoch):
            # shuffle to prevent repeating update cycles
            X, Y = (features, outputs)
            for ind, x in enumerate(X):
                ascent = self.calculate_cost_gradient(weights, x, Y[ind])
                weights = weights +(self.lr * ascent)


            cost = self.compute_cost(weights, features, outputs)
#             print("Epoch is: {} and Cost is: {}".format(epoch, cost))
            # stoppage criterion
#             print(cost)
#             if abs(prev_cost - cost) < cost_threshold:
#                 return weights
            prev_cost = cost
            nth += 1
        return weights
    
    
    
    def train_func(self,x_train,y_train):
        W = self.sgd(x_train, y_train)
#         print("training finished.")
        return W

    def validation(self,x_val,y_val,w):
        y_test_predicted = np.array([])
        for i in range(x_val.shape[0]):
            yp = np.sign(np.dot(w, x_val[i])) #model
            y_test_predicted = np.append(y_test_predicted, yp)
        return accuracy_score(y_val, y_test_predicted)



## Test the model

In [None]:
#train_op_data=insert_intercept(train_op_data)
svm_sgd=SVM(lr=0.2,lamb=50,epoch=100)
w=svm_sgd.train_func(np.array(kmer_data)[:1340,:],labels[:1340])
#svm_sgd.validation(np.array(kmer_data)[1340:2000,:],labels[1340:],w)

                   
                   

In [None]:
def objective(trial):
 
        lr = trial.suggest_float('lr',  1e-3, 1)
        
        epoch=trial.suggest_int('epoch',  20, 200)
        lamb=trial.suggest_int('lamb',  1e-5,20)
        C=trial.suggest_int('C',  1,20)
        sigma=trial.suggest_int('sigma',  1e-7,20)
        svmm=SVM(lr=lr,lamb=lamb, epoch=epoch)
        w=svm_sgd.train_func(X_train,y_train)
        acc=svmm.validation(X_val,y_val,w)
            

        return acc

In [None]:
import optuna

sampler = optuna.samplers.TPESampler()
study = optuna.create_study(sampler=sampler, direction='maximize')
study.optimize(func=objective, n_trials=100,show_progress_bar=True)

## cross validation

In [None]:

def cross_validate(x_data,y_data,model,lr,lamda=0.2,epoch=10,k=5):
    if len(x_data)%k != 0:
        print('cant vsplit',len(x_data),' by ',k)
        return
    
    x_data_splitted = np.vsplit(x_data,k)
    y_data_splitted = np.vsplit(y_data,k)
    
    aggrigate_result = []
    
    for i in range(len(x_data_splitted)):
        train = []
        test = []
        items = [j for j in range(len(x_data_splitted)) if j !=i ]
        x_test = x_data_splitted[i]
        y_test = y_data_splitted[i]
        
        for item in items:
            if len(train) == 0:
                x_train = x_data_splitted[item]
                y_train = y_data_splitted[item]
                
                
            else:
                x_train = np.concatenate((x_train,x_data_splitted[item]), axis=0)
                
                
                y_train = np.concatenate((y_train,y_data_splitted[item]), axis=0)
                
                
       
        w=model.train_func(x_train,y_train)
       
       
        
        result = model.validation(x_test,y_test,w)
        aggrigate_result.append(result)
         
        
        value = sum(aggrigate_result)/len(aggrigate_result)
        
        
    return value 

In [96]:
class SVM(object):

    def __init__(self, kernel=polynomial_kernel, C=0.2, power=2):
        self.kernel = polynomial_kernel
        self.C = C
        self.power=power
        if self.C is not None: self.C = float(self.C)

    def fit(self, X, y):
        n_samples, n_features = X.shape

        # Gram matrix
        K = np.zeros((n_samples, n_samples))
        for i in range(n_samples):
            for j in range(n_samples):
                K[i,j] = self.kernel(X[i], X[j],self.power)
        #y=y.reshape(-1,1) * 1.
        #print(y)

        P = cvxopt.matrix(np.outer(y,y) * K)
        q = cvxopt.matrix(np.ones(n_samples) * -1)
        
        A = y.reshape(1,n_samples)
        A=A.astype('float')
        A=cvxopt.matrix(A)
        
        
        b = cvxopt.matrix(0.0)

        if self.C is None:
            G = cvxopt.matrix(np.diag(np.ones(n_samples) * -1))
            h = cvxopt.matrix(np.zeros(n_samples))
        else:
            tmp1 = np.diag(np.ones(n_samples) * -1)
            tmp2 = np.identity(n_samples)
            G = cvxopt.matrix(np.vstack((tmp1, tmp2)))
            tmp1 = np.zeros(n_samples)
            tmp2 = np.ones(n_samples) * self.C
            h = cvxopt.matrix(np.hstack((tmp1, tmp2)))

        # solve QP problem
        solution = cvxopt.solvers.qp(P, q, G, h, A, b)

        # Lagrange multipliers
        a = np.ravel(solution['x'])

        # Support vectors have non zero lagrange multipliers
        sv = a > 1e-12
        ind = np.arange(len(a))[sv]
        self.a = a[sv]
        self.sv = X[sv]
        self.sv_y = y[sv]
        print(self.sv_y)
        print("%d support vectors out of %d points" % (len(self.a), n_samples))

        # Intercept
        self.b = 0
        for n in range(len(self.a)):
            self.b =self.b+ self.sv_y[n]
            self.b =self.b- np.sum(self.a * self.sv_y * K[ind[n],sv])
        self.b /= len(self.a)

        # Weight vector
        if self.kernel == polynomial_kernel:
            self.w = np.zeros(n_features)
            for n in range(len(self.a)):
                self.w += self.a[n] * self.sv_y[n] * self.sv[n]
        else:
            self.w = None

    def project(self, X):
        if self.w is not None:
            return np.dot(X, self.w) + self.b
        else:
            y_predict = np.zeros(len(X))
            for i in range(len(X)):
                s = 0
                for a, sv_y, sv in zip(self.a, self.sv_y, self.sv):
                    s += a * sv_y * self.kernel(X[i], sv, self.power)
                y_predict[i] = s
            return y_predict + self.b

    def predict(self, X):
        return np.sign(self.project(X))


In [97]:

svmm=SVM(kernel=polynomial_kernel,C=0,power=5)
svmm.fit(X_train,y_train)
y_predict = svmm.predict(X_val)
print(accuracy_score(y_val, y_predict) )


     pcost       dcost       gap    pres   dres
 0: -6.1643e+02 -1.1816e+01  7e+03  9e+01  2e-14
 1: -4.1629e+01 -5.5610e-01  4e+02  5e+00  1e-14
 2: -6.3957e-01 -3.2151e-04  6e+00  8e-02  2e-15
 3: -6.4041e-03 -3.2274e-08  6e-02  8e-04  2e-15
 4: -6.4041e-05 -3.2274e-12  6e-04  8e-06  2e-15
 5: -6.4041e-07 -3.2274e-16  6e-06  8e-08  2e-15
 6: -6.4041e-09 -3.2267e-20  6e-08  8e-10  2e-15
Optimal solution found.
[[ 1]
 [-1]
 [-1]
 ...
 [-1]
 [ 1]
 [ 1]]
1263 support vectors out of 1340 points
0.4712121212121212


In [None]:
def objective(trial):
 
        C = trial.suggest_float('C',  1e-3, 10)
        p = trial.suggest_float('p',  1e-4, 10)
        #gamma = trial.suggest_float('gamma',  0.1, 100)
        #kernels=['polynomial_kernel','gaussian_kernel','rbf_kernel','linear_kernel']
        #sigma=trial.suggest_int('sigma',  1, 20)
        svmm=SVM(kernel=polynomial_kernel,C=C, power=p)
        svmm.fit(np.array(kmer_data)[:1340,:],labels[:1340])
        y_predict = svmm.predict(np.array(kmer_data)[1340:,:])
            

        return accuracy_score(labels[1340:], y_predict)  

        
            

In [None]:
import optuna

sampler = optuna.samplers.TPESampler()
study = optuna.create_study(sampler=sampler, direction='maximize')
study.optimize(func=objective, n_trials=100,show_progress_bar=True)

In [None]:
df = study.trials_dataframe().drop(['state','datetime_start','datetime_complete'], axis=1)
df.sort_values(by=['value'])

In [None]:

def cross_validate(x_data,y_data,model, testData, kernelFunc, powerI, lambdaPara,epoch=10,k=5):
    if len(x_data)%k != 0:
        print('cant vsplit',len(x_data),' by ',k)
        return
    
    x_data_splitted = np.vsplit(x_data,k)
    y_data_splitted = np.vsplit(y_data,k)
    
    aggrigate_result = []
    
    for i in range(len(x_data_splitted)):
        train = []
        test = []
        items = [j for j in range(len(x_data_splitted)) if j !=i ]
        x_test = x_data_splitted[i]
        y_test = y_data_splitted[i]
        
        for item in items:
            if len(train) == 0:
                x_train = x_data_splitted[item]
                y_train = y_data_splitted[item]
                
                
            else:
                x_train = np.concatenate((x_train,x_data_splitted[item]), axis=0)
                
                
                y_train = np.concatenate((y_train,y_data_splitted[item]), axis=0)
                
                
       
        w=model.train_func(x_train,y_train,testData, kernelFunc, powerI, lambdaPara)
       
       
        
        result = model.validation(x_test,y_test,w)
        aggrigate_result.append(result)
         
        
        value = sum(aggrigate_result)/len(aggrigate_result)
        
        
    return value 

## kernel logistic regression


In [None]:
def error(ypred, ytrue):
    e = (ypred != ytrue).mean()
    return e

In [None]:
class KernelMethodBase(object):
    '''
    Base class for kernel methods models
    
    Methods
    ----
    fit
    predict
    '''
    kernels_ = {
        'linear': linear_kernel,
        'polynomial': polynomial_kernel,
        'rbf': rbf_kernel,
        'gaussian':gaussian_kernel
    }
    def __init__(self, kernel='linear', **kwargs):
        self.kernel_name = kernel
        self.kernel_function_ = self.kernels_[kernel]
        self.kernel_parameters = self.get_kernel_parameters(**kwargs)
        
    def get_kernel_parameters(self, **kwargs):
        params = {}
        if self.kernel_name == 'rbf':
            params['sigma'] = kwargs.get('sigma', None)
        if self.kernel_name == 'polynomial':
            params['power'] = kwargs.get('power', None)
        return params

    def fit(self, X, y, **kwargs):
        return self
        
    def decision_function(self, X):
        pass

    def predict(self, X):
        pass

In [None]:
class KernelRidgeRegression(KernelMethodBase):
    '''
    Kernel Ridge Regression
    '''
    def __init__(self, lambd=0.1, **kwargs):
        self.lambd = lambd
        # Python 3: replace the following line by
        # super().__init__(**kwargs)
        super(KernelRidgeRegression, self).__init__(**kwargs)

    def fit(self, X, y, sample_weights=None):
        n, p = X.shape
        assert (n == len(y))
    
        self.X_train = X
        self.y_train = y
        
        if sample_weights is not None:
            w_sqrt = np.sqrt(sample_weights)
            self.X_train = self.X_train * w_sqrt
            self.y_train = self.y_train * w_sqrt
        
        A = self.kernel_function_(X,X,**self.kernel_parameters)
        A[np.diag_indices_from(A)] = np.add(A[np.diag_indices_from(A)],n*self.lambd)
        # self.alpha = (K + n lambda I)^-1 y
        self.alpha = np.linalg.solve(A , self.y_train)

        return self
    
    def decision_function(self, X):
        K_x = self.kernel_function_(X,self.X_train, **self.kernel_parameters)
        return K_x.dot(self.alpha)
    
    def predict(self, X):
        return self.decision_function(X)

In [None]:
def cross_validate(x_data,y_data,kernel=None,lambd=0.2,sigma=0.5,k=5,power=2):
    if len(x_data)%k != 0:
        print('cant vsplit',len(x_data),' by ',k)
        return
    
    x_data_splitted = np.vsplit(x_data,k)
    y_data_splitted = np.vsplit(y_data.reshape(-1,1),k)
    
    aggrigate_result = []
    for i in range(len(x_data_splitted)):
        train = []
        test = []
        items = [j for j in range(len(x_data_splitted)) if j !=i ]
        x_test = x_data_splitted[i]
        y_test = y_data_splitted[i]
        for item in items:
            if len(train) == 0:
                x_train = x_data_splitted[item]
                y_train = y_data_splitted[item]
            else:
                x_train = np.concatenate((x_train,x_data_splitted[item]), axis=0)
                y_train = np.concatenate((y_train,y_data_splitted[item]), axis=0)
            
            
        model = KernelRidgeRegression(
                kernel=kernel,
                lambd=lambd,
                sigma=sigma,
                power=power
            ).fit(x_train, y_train)
        result = sum(np.sign(model.predict(x_test))==y_test)/len(y_test)
        aggrigate_result.append(result)
        
        value = sum(aggrigate_result)/len(aggrigate_result)
    return value

In [None]:
cross_validate(X_train,y_train,kernel='polynomial', lambd=4.023839198201892e-06,k=5,sigma=4.,power=2)

In [None]:
def objective(trial):
    lambd = trial.suggest_loguniform('lambd', 1e-6, 5)
    sigma = trial.suggest_loguniform('sigma', 1e-6, 6)
    k =  trial.suggest_categorical('k', [2,4,5,8,10])
    power =  trial.suggest_int('power', 2,6)
    kernel =  trial.suggest_categorical('kernel', ['linear','rbf','polynomial'])
    
    return cross_validate(X_train,y_train,kernel=kernel,lambd=lambd,k=4,sigma=sigma,power=power)

In [None]:
# cross_validate(X_train_mat100, y,lamda=0.01,k=4)
import optuna

sampler = optuna.samplers.TPESampler()
study = optuna.create_study(sampler=sampler, direction='maximize')
df = study.optimize(func=objective, n_trials=500,show_progress_bar=True)

In [None]:
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

class KernelLogisticRegression(KernelMethodBase):
    '''
    Kernel Logistic Regression
    '''
    def __init__(self, lambd=0.1, **kwargs):
        self.lambd = lambd
        # Python 3: replace the following line by
        # super().__init__(**kwargs)
        super(KernelLogisticRegression, self).__init__(**kwargs)

    def fit(self, X, y, max_iter=100, tol=1e-5):
        n, p = X.shape
        assert (n == len(y))
    
        self.X_train = X
        self.y_train = y
        
        K = self.kernel_function_(X, X, **self.kernel_parameters)
        
        # IRLS
        KRR = KernelRidgeRegression(
            lambd=2*self.lambd,
            kernel=self.kernel_name,
            **self.kernel_parameters
        )
        # Initialize
        alpha = np.zeros(n)
        # Iterate until convergence or max iterations
        for n_iter in range(max_iter):
            alpha_old = alpha
            m = K.dot(alpha_old)
            w = sigmoid(m) * sigmoid(-m)
            z = m + self.y_train / sigmoid(self.y_train * m)
            alpha = KRR.fit(self.X_train, z, sample_weights=w).alpha
            # Break condition (achieved convergence)
            if np.sum((alpha-alpha_old)**2) < tol:
                break

        self.n_iter = n_iter
        self.alpha = alpha

        return self
            
    def decision_function(self, X_test):
        K_x = self.kernel_function_(X_test, self.X_train, **self.kernel_parameters)
        # Probability of y=1 (between 0 and 1)
        return np.sign(K_x.dot(self.alpha))

    def predict(self, X):
        probas = self.decision_function(X)
        predicted_classes = np.where(probas < 0.5, -1, 1)
        return predicted_classes 

In [None]:
kernel = 'rbf'
sigma = .4
lambd = 1.
fig_title = 'Logistic Regression, {} Kernel'.format(kernel)

model = KernelLogisticRegression(lambd=lambd, kernel=kernel, sigma=sigma)
y_pred = model.fit(X_train, y_train).predict(X_val)
print('Test error: {:.2%}'.format(error(y_pred, y_val)))