## Said Saterih

In [4]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.special import expit
from scipy.optimize import minimize
from collections import Counter
from itertools import product
from itertools import combinations
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error
from joblib import Parallel, delayed

# Load the dataset

## Numeric data

In [5]:
X0 = np.genfromtxt("./Xtr0_mat100.csv", delimiter=" ")
y0 = np.genfromtxt("./Ytr0.csv", delimiter = ",",  skip_header = 1, usecols = 1)
Xte0 = np.genfromtxt("./Xte0_mat100.csv", delimiter=" ")

In [6]:
X1 = np.genfromtxt("./Xtr1_mat100.csv", delimiter=" ")
y1 = np.genfromtxt("./Ytr1.csv", delimiter = ",",  skip_header = 1, usecols = 1)
Xte1 = np.genfromtxt("./Xte1_mat100.csv", delimiter=" ")

In [7]:
X2 = np.genfromtxt("./Xtr2_mat100.csv", delimiter=" ")
y2 = np.genfromtxt("./Ytr2.csv", delimiter = ",",  skip_header = 1, usecols = 1)
Xte2 = np.genfromtxt("./Xte2_mat100.csv", delimiter=" ")

## DNA Sequence

In [8]:
X = pd.read_csv("./Xtr0.csv")
X.set_index('Id', inplace=True)
y = pd.read_csv("./Ytr0.csv")
y.set_index('Id', inplace=True)
Xte =  pd.read_csv("./Xte0.csv")
Xte.set_index('Id', inplace=True)

In [9]:
X3 = pd.read_csv("./Xtr1.csv")
X3.set_index('Id', inplace=True)
y3 = pd.read_csv("./Ytr1.csv")
y3.set_index('Id', inplace=True)
Xte3 =  pd.read_csv("./Xte1.csv")
Xte3.set_index('Id', inplace=True)

In [10]:
X4 = pd.read_csv("./Xtr2.csv")
X4.set_index('Id', inplace=True)
y4 = pd.read_csv("./Ytr2.csv")
y4.set_index('Id', inplace=True)
Xte4 =  pd.read_csv("./Xte2.csv")
Xte4.set_index('Id', inplace=True)

In [11]:
## Kernel matrices : polynomial ,  RBF, Spectrum
class kernels :
    def __init__(self, Xtrain):
        self.Xtr = Xtrain
        
    def linear_kernel(self, X):
        """
        
        Implement the linear kernel
        
        """
        return np.dot(X, self.Xtr.T)
    
    def polynomial_kernel(self, X,s):
        """
        
        Implement polynomial kernel 
        
        """
        return (np.dot(X, self.Xtr.T))**s
        
    def gaussian_kernel(self, X):
        """
        
        Implement gaussian kernel
        
        """
        # Bandwidth
        n = self.Xtr.shape[0]
        sigma2 = (1/n**(2/5)) * np.sqrt(np.trace(np.cov(self.Xtr, rowvar = False)))
        
        # Compute the pairwise distances
        D = np.sum((X[:, np.newaxis] - self.Xtr[np.newaxis, :])**2, axis=2)

        # Compute the RBF kernel matrix
        kernel_matrix = np.exp(-D / (2 * self.sigma**2))
     
    ## Implement the spectrum kernel
    def spectrum_kernel1(self,sequence1, sequence2, k):
        """
        Implement the spectrum kernel
        """
        kmer_counts1 = Counter(sequence1[i:i+k] for i in range(len(sequence1) - k + 1))
        kmer_counts2 = Counter(sequence2[i:i+k] for i in range(len(sequence2) - k + 1))
        
        common_kmers = list(set(kmer_counts1.keys()) & set(kmer_counts2.keys()))
        
        res = 0
        for kmer in common_kmers:
            res = res + kmer_counts1[kmer] * kmer_counts2[kmer] 
            
        return res
        
    def spectrum_kernel2(self,X,k):
        """
        Assemble the kernel matrix given by spectrum method
        """
        n = self.Xtr.shape[0]
        K = np.zeros((X.shape[0], n))
        
        if (K.shape[0] == K.shape[1]):
            rows, cols = np.tril_indices(n, -1)
        else : 
            rows, cols = np.indices((K.shape[0],K.shape[1]))
            rows = rows.reshape(-1)
            cols = cols.reshape(-1)
        
        values = Parallel(n_jobs=8)(
            delayed(self.spectrum_kernel1)(X.iloc[i]["seq"], self.Xtr.iloc[j]["seq"], k) for i, j in zip(rows, cols)
        )

        K[rows, cols] = values
        
        if (K.shape[0] == K.shape[1]):
            
            K += K.T
        
            diag_values = Parallel(n_jobs=8)(
                delayed(self.spectrum_kernel1)(X.iloc[i]["seq"], self.Xtr.iloc[i]["seq"], k) for i in range(n)
            )
            K[np.diag_indices(n)] = diag_values

        return K
    
    
## Training algorithms

class training_algo:
    
    def __init__(self, kernel , label ):
        self.n = kernel.shape[0] # size training set
        self.K = kernel # kernel matrix
        self.y = label # training label
    
    # Logistic regression
    def obj_fun_log(self, alpha, lbda):
        """
        Objective function for kernel logistic regression.

        Parameters:
        alpha (numpy.ndarray): Coefficient vector.
        lbda (float): Regularization parameter.

        Returns:
        float: Value of the objective function.
        """
        
        Ka = self.K @ alpha
        data_fit = -np.mean(np.log(expit(self.y * Ka) + 1e-10))
        reg = lbda * alpha.T.dot(Ka) / 2
        return data_fit + reg
    
    # SVM (Primal form)
    def SVM(self, alpha, lbda):
        """
        Objective function for kernel SVM.

        Parameters:
        alpha (numpy.ndarray): Coefficient vector.
        lbda (float): Regularization parameter.

        Returns:
        float: Value of the objective function.
        """
        
        Ka = self.K.dot(alpha)  # Compute the kernel product
        
        # Compute element-wise hinge loss
        hinge_loss = np.mean(np.maximum(0, 1 - self.y * Ka))

        # Regularization term
        reg = lbda * alpha.T.dot(Ka) / 2

        return hinge_loss + reg
    
    def SVM2(self, alpha, lbda):
        """
        Objective function for SVM2.

        Parameters:
        alpha (numpy.ndarray): Coefficient vector.
        lbda (float): Regularization parameter.

        Returns:
        float: Value of the objective function.
        """
        
        Ka = self.K.dot(alpha)  # Compute the kernel product
        
        # Compute element-wise hinge loss
        hinge_loss = np.mean(np.maximum(0, 1 - self.y * Ka)**2)

        # Regularization term
        reg = lbda * alpha.T.dot(Ka) / 2

        return hinge_loss + reg
    
    
    def minimize_obj_fun(self,pb, alpha0):
        """
        Minimize the objective function using L-BFGS-B.

        Parameters:
        pb (function): Objective function to minimize.
        alpha0 (numpy.ndarray): Initial coefficient vector.
        lbda (float): Regularization parameter.

        Returns:
        scipy.optimize.OptimizeResult: Result of the optimization.
        """
        lbda = np.sqrt(1/self.n)
        result = minimize(
            fun=pb,
            x0=alpha0,
            args=(lbda,),
            method='L-BFGS-B',
            options={'disp': True}
        )
        return result  

## Prediction

class prediction:
    def __init__(self, coeff,  Xtest, ker):
        self.alpha = coeff # Coefficient learned
        self.Xte = Xtest # testing set
        self.kernel = ker # kernel
        
    def linear_prediction(self):
        K_test = self.kernel.linear_kernel(self.Xte)
        f = K_test.dot(self.alpha)
        return (np.sign(f) + 1.) / 2
    
    def polynomial_prediction(self,s):
        K_test = self.kernel.polynomial_kernel(self.Xte,s)
        f = K_test.dot(self.alpha)
        return (np.sign(f) + 1.) / 2
    
    def spectrum_prediction(self,k):
        K_test = self.kernel.spectrum_kernel2(self.Xte, k)
        f = K_test.dot(self.alpha)
        return (np.where(f >= 0, 1, -1) + 1) / 2
    

# Prediction for spectrum kernel using logistic classifier

# Prediction with spectrum kernel with k = 5 using logistic regression

In [15]:
## First dataset

ker= kernels(X)
Kspec = ker.spectrum_kernel2(X,k=5)
Kspec = Kspec + 10**(-10) * np.eye(X.shape[0])
train = training_algo(Kspec, 2*(y['Bound'].to_numpy(float) - 0.5))
alpha = train.minimize_obj_fun(train.obj_fun_log, np.zeros(Kspec.shape[0]))
pred = prediction(alpha.x, Xte, ker)
Yte = pred.spectrum_prediction(5)

## Second dataset

ker3= kernels(X3)
Kspec3 = ker3.spectrum_kernel2(X3,k=5)
Kspec3 = Kspec3 + 10**(-10) * np.eye(X3.shape[0])
train3 = training_algo(Kspec3, 2*(y3['Bound'].to_numpy(float) - 0.5))
alpha3 = train.minimize_obj_fun(train3.obj_fun_log, np.zeros(Kspec3.shape[0]))
pred3 = prediction(alpha3.x, Xte3, ker3)
Yte3 = pred3.spectrum_prediction(5)


## Third dataset


ker4= kernels(X4)
Kspec4 = ker4.spectrum_kernel2(X4,k=5) + 10**(-10)*np.eye(X4.shape[0])
train4 = training_algo(Kspec4, 2*(y4['Bound'].to_numpy(float) - 0.5))
alpha4 = train.minimize_obj_fun(train4.obj_fun_log ,np.zeros(Kspec4.shape[0]))
pred4 = prediction(alpha4.x, Xte4, ker4)
Yte4 = pred4.spectrum_prediction(5)


## Create a CSV for predictions

In [16]:
liste_concatenee = []
liste_concatenee.extend(Yte)
liste_concatenee.extend(Yte3)
liste_concatenee.extend(Yte4)
                    
# Créer un DataFrame
df = pd.DataFrame({
    'Id': [int(x) for x in list(range(3000))],
    'Bound': [int(x) for x in liste_concatenee] 
})
df['Bound'] = df['Bound'].astype('int')

# Sauvegarder en CSV
df.to_csv('Ypred_spec.csv', index=False, header=True)

## Cross validation for polynomial kernel (on the degrees) and learn on the best hyperparameters using SVM (primal form)

In [17]:
def cross_validation(X, y, s_values, n_splits=5):
    best_score = float('inf')
    best_params = None

    kf = KFold(n_splits=n_splits)

    for s in s_values:
        scores = []
        for train_index, val_index in kf.split(X):
            X_train, X_val = X[train_index], X[val_index]
            y_train, y_val = y[train_index], y[val_index]

            ker = kernels(X_train)
            K = ker.polynomial_kernel(X_train, s=s)
            K = K + 10**(-10) * np.eye(X_train.shape[0])

            train = training_algo(K, 2*(y_train - 0.5))
            alpha = train.minimize_obj_fun(train.SVM, np.ones(K.shape[0]))

            pred = prediction(alpha.x, X_val, ker)
            Yte0 = pred.polynomial_prediction(s)
            
            y_val_np = y_val.values.reshape(Yte0.shape) if isinstance(y_val, pd.Series) else y_val

            if (y_val_np.shape == Yte0.shape) : 
                score = mean_squared_error(Yte0, y_val_np)
                scores.append(score)
            else : 
                print(f"Error size : y_val -> {y_val.shape} ,  Yte0 -> {Yte0.shape}")
                y_val_np = y_val.values.reshape(Yte0.shape) if isinstance(y_val, pd.Series) else y_val
                score = mean_squared_error(Yte0, y_val_np)
                scores.append(score)

        avg_score = np.mean(scores)
        if avg_score < best_score:
            best_score = avg_score
            best_params = s

    return best_params, best_score

In [18]:
s_values = [1,2,3,4,5,6,7,8,9,10]  
best_params, best_score = cross_validation(X0, y0, s_values)
best_params1, best_score1 = cross_validation(X1, y1, s_values)
best_params2 , best_score2 = cross_validation(X2, y2, s_values)

In [19]:
ker0 = kernels(X0)
Kpol0 = ker0.polynomial_kernel(X0,best_params)
Kpol0 = Kpol0 + 10**(-10) * np.eye(X0.shape[0])
train = training_algo(Kpol0, 2*(y0 - 0.5))
alpha = train.minimize_obj_fun(train.SVM, np.ones(Kpol0.shape[0]))
pred0 = prediction(alpha.x, Xte0, ker0)
Yte0 = pred0.polynomial_prediction(best_params)

## Second dataset

ker1 = kernels(X1)
Kpol1 = ker1.polynomial_kernel(X1,best_params1)
Kpol1 = Kpol1 + 10**(-10) * np.eye(X1.shape[0]) # to avoid 
train = training_algo(Kpol1, 2*(y1 - 0.5))
alpha = train.minimize_obj_fun(train.SVM, np.ones(Kpol1.shape[0]))
pred1 = prediction(alpha.x, Xte1, ker1)
Yte1 = pred1.polynomial_prediction(best_params1)

## Third Dataset

ker2 = kernels(X2)
Kpol2 = ker2.polynomial_kernel(X2,best_params2)
Kpol2 = Kpol2 + 10**(-10) * np.eye(X2.shape[0])
train = training_algo(Kpol2, 2*(y2 - 0.5))
alpha = train.minimize_obj_fun(train.SVM, np.ones(Kpol2.shape[0]))
pred2 = prediction(alpha.x, Xte2, ker2)
Yte2 = pred2.polynomial_prediction(best_params2)


## Create a CSV for predictions

In [20]:
liste_concatenee = []
liste_concatenee.extend(Yte0)
liste_concatenee.extend(Yte1)
liste_concatenee.extend(Yte2)
                    
# Créer un DataFrame
df = pd.DataFrame({
    'Id': [int(x) for x in list(range(3000))],
    'Bound': [int(x) for x in liste_concatenee] 
})
df['Bound'] = df['Bound'].astype('int')

# Sauvegarder en CSV
df.to_csv('Ypred_poly.csv', index=False, header=True)

# Prediction for polynomial using SVM2 with best parameters by SVM

In [23]:
## First dataset

ker02 = kernels(X0)
Kpol02 = ker02.polynomial_kernel(X0, best_params)
Kpol02 = Kpol02 + 10**(-10) * np.eye(X0.shape[0])
train02 = training_algo(Kpol02, 2*(y0 - 0.5))
alpha02 = train02.minimize_obj_fun(train02.SVM2, np.ones(Kpol02.shape[0]))
pred02 = prediction(alpha02.x, Xte0, ker02)
Yte02 = pred02.polynomial_prediction(best_params)

## Second dataset

ker12 = kernels(X1)
Kpol12 = ker12.polynomial_kernel(X1, best_params1)
Kpol12 = Kpol12 + 10**(-10) * np.eye(X1.shape[0])  # To avoid numerical instability
train12 = training_algo(Kpol12, 2*(y1 - 0.5))
alpha12 = train12.minimize_obj_fun(train12.SVM2, np.ones(Kpol12.shape[0]))
pred12 = prediction(alpha12.x, Xte1, ker12)
Yte12 = pred12.polynomial_prediction(best_params1)

## Third dataset

ker22 = kernels(X2)
Kpol22 = ker22.polynomial_kernel(X2, best_params2)
Kpol22 = Kpol22 + 10**(-10) * np.eye(X2.shape[0])
train22 = training_algo(Kpol22, 2*(y2 - 0.5))
alpha22 = train22.minimize_obj_fun(train22.SVM2, np.ones(Kpol22.shape[0]))
pred22 = prediction(alpha22.x, Xte2, ker22)
Yte22 = pred22.polynomial_prediction(best_params2)


## Create a CSV for predictions (SVM2)

In [24]:
liste_concatenee = []
liste_concatenee.extend(Yte02)
liste_concatenee.extend(Yte12)
liste_concatenee.extend(Yte22)
                    
# Créer un DataFrame
df = pd.DataFrame({
    'Id': [int(x) for x in list(range(3000))],
    'Bound': [int(x) for x in liste_concatenee] 
})
df['Bound'] = df['Bound'].astype('int')

# Sauvegarder en CSV
df.to_csv('Ypred_poly2.csv', index=False, header=True)