In [None]:
import numpy as np
from scipy.interpolate import interp1d
import pandas as pd
from sklearn import metrics
from scipy.signal import savgol_filter


#SNIP algrithm
def snip(y, iterations=20, decreasing=True):
    n = len(y)
    d = int(decreasing)

    xo = np.empty(n, dtype=np.float64)
    xy = np.array(y, dtype=np.float64)

    k = int(iterations)

    # code duplication to use faster ++i/--i instead of i+=step
    if d:
        for i in range(k, 0, -1):
            for j in range(i, n - i):
                a = xy[j]
                b = (xy[j - i] + xy[j + i]) / 2
                if b < a:
                    a = b
                xo[j] = a

            for j in range(i, n - i):
                xy[j] = xo[j]
    else:
        for i in range(1, k + 1):
            for j in range(i, n - i):
                a = xy[j]
                b = (xy[j - i] + xy[j + i]) / 2
                if b < a:
                    a = b
                xo[j] = a

            for j in range(i, n - i):
                xy[j] = xo[j]

    xo = xy.copy()

    return xo


# intensity calibration
def toc(y_data):
    return y_data / y_data.sum()


def interpolate(data, boundary, mask, kind):
    if kind != None:
        f = interp1d(data[0],
                     data[1],
                     kind=kind,
                     bounds_error=False,
                     fill_value=0,
                     assume_sorted=False)
    new_data = []
    if kind != None:
        for i in range(len(boundary)):
            new_data.append(f(boundary[i]))
    else:
        for i in range(len(boundary)):
            if i + 1 == len(boundary):
                break
            if ((data[0] < boundary[i + 1]) &
                (data[0] > boundary[i])).astype('int').sum() != 0:
                new_data.append(data[1][(data[0] < boundary[i + 1])
                                        & (data[0] > boundary[i])].max())
            else:
                new_data.append(0)
    new_data = np.array(new_data)

    new_data[new_data < 0] = 0
    if mask is not None:
        new_data = new_data[mask]
    return new_data


def SampleGenerator(data, boundary, housekeeping, mask=None, kind=None, shuffle=True):
    if shuffle:
        data = data.sample(frac=1).reset_index(drop=True)
    ids = data['ID']
    x = []
    for id in ids:
        x.append(load_mass('./data/' + id + '.txt', boundary, mask, kind, housekeeping))
    x = np.array(x)

    label = data['Class']
    y = np.zeros_like(label)
    y[label == 'S'] = 1
    y = y.astype('float')
    return x, y


def load_mass(path, boundary, mask, kind,housekeeping):
    data = [[], []]

    with open(path, 'r') as file:
        lines = file.readlines()
        # remove comments
        lines = lines[8:]
        for line in lines:
            data[0].append(float(line.split(' ')[0]))
            data[1].append(float(line.split(' ')[1]))
    data = np.array(data)

    if housekeeping is not None:
        min_diff = 1e5
        for item in data[0]:
            diff = housekeeping - item
            if np.abs(diff) < np.abs(min_diff):
                min_diff = diff
        data[0] = data[0] + min_diff

    # variance stabilising
    data[1] = np.sqrt(data[1])
    # smoothing
    data[1] = savgol_filter(data[1], window_length=21,
                            polyorder=3)
    data[1][data[1] < 0] = 0
    # baseline removal
    data[1] = data[1] - snip(data[1].copy())
    # intensity calibration
    data[1] = toc(data[1])

    #bin
    new_data = interpolate(data, boundary, mask, kind=kind)
    return new_data

In [None]:
import sklearn
import pandas as pd
from sklearn.svm import LinearSVC
from sklearn.model_selection import train_test_split
from sklearn import metrics
from sklearn.model_selection import StratifiedKFold
from warnings import simplefilter
from sklearn.exceptions import ConvergenceWarning

simplefilter("ignore", category=ConvergenceWarning)
data = pd.read_csv('../data/data.csv')
test_ptient = data.Patient.sample(80)
test_ptient_data = data[data.Patient.isin(test_ptient)]
train_data = data.drop(test_ptient_data.index).reset_index(drop=True)
test_data = test_ptient_data.reset_index(drop=True)
boundary = np.arange(0, 1801) * 5 + 2000

In [None]:
def load_mass_point(path):
    s_data = [[], []]
    with open(path, 'r') as file:
        lines = file.readlines()
        lines = lines[8:]
        for line in lines:
            s_data[0].append(float(line.split(' ')[0]))
            s_data[1].append(float(line.split(' ')[1]))
    return np.array(s_data)


housekeeping = 4428
m_z_freq = [0 for i in range(30000)]
for id in data.ID:
    x = load_mass_point('../data/' + id + '.txt')
    int_x = np.int32(x[0])
    min_diff=1e5
    for item in int_x:
        m_z_freq[item] += 1
        diff = 4428 - item
        if np.abs(diff) < np.abs(min_diff):
            min_diff = diff
    if np.abs(min_diff) > 5:
        print(min_diff)
        print(id)

m_z_freq = np.array(m_z_freq)
m_z_freq[4428],len(data)

In [None]:
from sklearn.model_selection import GridSearchCV

grid = GridSearchCV(LinearSVC(penalty='l1',
                              loss='squared_hinge',
                              dual=False,
                              fit_intercept=True),
                    param_grid={
                        "C": [0.1, 0.5, 1, 5, 10],
                        "max_iter": [1000, 5000, 10000, 50000, 100000]
                    },
                    cv=5)
x_train, y_train = SampleGenerator(train_data,boundary,housekeeping)
grid.fit(x_train, y_train)
print("The best parameters are %s with a score of %0.2f" %
      (grid.best_params_, grid.best_score_))


In [None]:
best_model = LinearSVC(penalty='l1',
                       loss='squared_hinge',
                       dual=False,
                       fit_intercept=True,
                       **grid.best_params_)
best_model.fit(x_train, y_train)

x_test, y_test = SampleGenerator(test_data, boundary, housekeeping)
expected = y_test
predicted = best_model.predict(x_test)

print(metrics.classification_report(expected, predicted))
print(metrics.confusion_matrix(expected, predicted))

auc = metrics.roc_auc_score(y_test, predicted)
accuracy = metrics.accuracy_score(y_test, predicted)
print("Accuracy: %.2f%%" % (accuracy * 100.0))

In [None]:
mask = np.abs(best_model.coef_[0]) > 0
print(f'useful feature dim: {mask.sum()}')

In [None]:
feature_mask = [mask]
x_train, y_train = SampleGenerator(train_data,boundary,housekeeping)
x_test, y_test = SampleGenerator(test_data, boundary, housekeeping)
while True:
    grid = GridSearchCV(LinearSVC(penalty='l1',
                                  loss='squared_hinge',
                                  dual=False,
                                  fit_intercept=True),
                        param_grid={
                            "C": [0.1, 0.5, 1, 5, 10],
                            "max_iter": [1000, 5000, 10000, 50000, 100000]
                        },
                        cv=5)
    x_train = x_train[:, feature_mask[-1]]
    grid.fit(x_train, y_train)
    print("The best parameters are %s with a score of %0.2f" %
          (grid.best_params_, grid.best_score_))

    model = LinearSVC(penalty='l1',
                      loss='squared_hinge',
                      dual=False,
                      fit_intercept=True,
                      **grid.best_params_)
    model.fit(x_train, y_train)

    x_test = x_test[:, feature_mask[-1]]
    expected = y_test
    predicted = model.predict(x_test)

    print(metrics.classification_report(expected,
                                        predicted))
    print(metrics.confusion_matrix(expected, predicted))

    auc = metrics.roc_auc_score(y_test, predicted)
    accuracy = metrics.accuracy_score(y_test, predicted)
    print("Accuracy: %.2f%%" % (accuracy * 100.0))
    mask = np.abs(model.coef_[0]) > 0
    print(f'useful feature dim: {mask.sum()}')

    final_model = model
    final_mask = np.zeros_like(feature_mask[0])
    mask_str = 'feature bin: '
    for i in range(len(mask)):
        if mask[i] > 0:
            index = i
            for j in range(len(feature_mask) - 1, -1, -1):
                it = 0
                for k in range(len(feature_mask[j])):
                    if feature_mask[j][k] > 0:
                        if it == index:
                            index = k
                            break
                        it += 1
            final_mask[index] = 1
            mask_str += f'{2000+5*index}~{2000+5*(index+1)}  '
    print(mask_str)
    feature_mask.append(mask)
    if mask.sum() == len(mask):
        break

In [None]:
data = pd.read_csv('..//datadata.csv')
repeat_mask = np.zeros([1800])
for repeat in range(100):
    test_ptient = data.Patient.sample(80)
    test_ptient_data = data[data.Patient.isin(test_ptient)]
    train_data = data.drop(test_ptient_data.index).reset_index(drop=True)
    test_data = test_ptient_data.reset_index(drop=True)
    #调参
    from sklearn.model_selection import GridSearchCV

    grid = GridSearchCV(LinearSVC(penalty='l1',
                                  loss='squared_hinge',
                                  dual=False,
                                  fit_intercept=True),
                        param_grid={
                            "C": [0.1, 0.5, 1, 5, 10],
                            "max_iter": [1000, 5000, 10000, 50000, 100000]
                        },
                        cv=5)
    x_train, y_train = SampleGenerator(train_data, boundary, housekeeping)
    grid.fit(x_train, y_train)
    print("The best parameters are %s with a score of %0.2f" %
          (grid.best_params_, grid.best_score_))
    best_model = LinearSVC(penalty='l1',
                           loss='squared_hinge',
                           C=grid.best_params_['C'],
                           dual=False,
                           max_iter=grid.best_params_['max_iter'],
                           fit_intercept=True)
    best_model.fit(x_train, y_train)
    x_test, y_test = SampleGenerator(test_data, boundary, housekeeping)
    expected = y_test
    predicted = best_model.predict(x_test)
    print(metrics.classification_report(expected,
                                        predicted))
    print(metrics.confusion_matrix(expected, predicted))
    accuracy = metrics.accuracy_score(y_test, predicted)
    print("Accuracy: %.2f%%" % (accuracy * 100.0))
    mask = np.abs(best_model.coef_[0]) > 0
    print(f'useful feature dim: {mask.sum()}')

    feature_mask = [mask]
    x_train, y_train = SampleGenerator(train_data,boundary,housekeeping)
    x_test, y_test = SampleGenerator(test_data, boundary, housekeeping)
    while True:
        grid = GridSearchCV(LinearSVC(penalty='l1',
                                      loss='squared_hinge',
                                      dual=False,
                                      fit_intercept=True),
                            param_grid={
                                "C": [0.1, 0.5, 1, 5, 10],
                                "max_iter": [1000, 5000, 10000, 50000, 100000]
                            },
                            cv=5)
        x_train = x_train[:, feature_mask[-1]]
        grid.fit(x_train, y_train)
        print("The best parameters are %s with a score of %0.2f" %
              (grid.best_params_, grid.best_score_))

        model = LinearSVC(penalty='l1',
                          loss='squared_hinge',
                          C=grid.best_params_['C'],
                          dual=False,
                          max_iter=grid.best_params_['max_iter'],
                          fit_intercept=True)
        model.fit(x_train, y_train)

        x_test = x_test[:, feature_mask[-1]]
        expected = y_test
        predicted = model.predict(x_test)

        # 输出结果
        print(metrics.classification_report(expected,
                                            predicted))
        print(metrics.confusion_matrix(expected, predicted))

        auc = metrics.roc_auc_score(y_test, predicted)
        accuracy = metrics.accuracy_score(y_test, predicted)
        print("Accuracy: %.2f%%" % (accuracy * 100.0))
        mask = np.abs(model.coef_[0]) > 0
        print(f'useful feature dim: {mask.sum()}')

        final_model = model
        final_mask = np.zeros_like(feature_mask[0])
        mask_str = 'feature bin: '
        for i in range(len(mask)):
            if mask[i] > 0:
                index = i
                for j in range(len(feature_mask) - 1, -1, -1):
                    it = 0
                    for k in range(len(feature_mask[j])):
                        if feature_mask[j][k] > 0:
                            if it == index:
                                index = k
                                break
                            it += 1
                final_mask[index] = 1
                mask_str += f'{2000+5*index}~{2000+5*(index+1)}  '
        print(mask_str)
        feature_mask.append(mask)
        if mask.sum() == len(mask):
            break
    repeat_mask += final_mask

In [None]:
print((repeat_mask >= 100).sum())
data = pd.read_csv('../data/data.csv')
test_ptient = data.Patient.sample(80)
test_ptient_data = data[data.Patient.isin(test_ptient)]
train_data = data.drop(test_ptient_data.index).reset_index(drop=True)
test_data = test_ptient_data.reset_index(drop=True)
grid = GridSearchCV(LinearSVC(penalty='l1',
                              loss='squared_hinge',
                              dual=False,
                              fit_intercept=True),
                    param_grid={
                        "C": [0.1, 0.5, 1, 5, 10],
                        "max_iter": [1000, 5000, 10000, 50000, 100000]
                    },
                    cv=5)
x_train, y_train = SampleGenerator(train_data, boundary, housekeeping)
x_train = x_train[:, (repeat_mask ==100)]
grid.fit(x_train, y_train)
print("The best parameters are %s with a score of %0.2f" %
      (grid.best_params_, grid.best_score_))

model = LinearSVC(penalty='l1',
                  loss='squared_hinge',
                  dual=False,
                  fit_intercept=True,
                  **grid.best_params_)
model.fit(x_train, y_train)
x_test, y_test = SampleGenerator(test_data, boundary, housekeeping)
x_test = x_test[:, (repeat_mask == 100)]
expected = y_test
predicted = model.predict(x_test)

print(metrics.classification_report(expected, predicted))
print(metrics.confusion_matrix(expected, predicted))

auc = metrics.roc_auc_score(y_test, predicted)
accuracy = metrics.accuracy_score(y_test, predicted)
print("Accuracy: %.2f%%" % (accuracy * 100.0))