In [1]:
import pickle
import pandas as pd
import numpy as np
import keras
import tensorflow as tf
import matplotlib.pyplot as plt
from keras.models import Sequential
from keras.datasets import reuters
from keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from keras.preprocessing.text import Tokenizer
from keras.layers import Dense, Embedding, LSTM, SpatialDropout1D, Conv1D, MaxPooling1D, Flatten, Dropout, Activation, Layer, SimpleRNN
from keras import backend as K
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.neighbors import KNeighborsClassifier
from sklearn.feature_extraction.text import TfidfVectorizer
from copy import deepcopy

Using TensorFlow backend.


In [2]:
# Load the reuters dataset
max_features = 5000  # Number of words to consider as features
max_len = 500  # Cut texts after this number of words (among top max_features most common words)
(X_train, y_train), (X_test, y_test) = reuters.load_data(num_words=max_features)

# Pad sequences (to ensure equal length of all sequences)
X_train = pad_sequences(X_train, maxlen=max_len)
X_test = pad_sequences(X_test, maxlen=max_len)

print('X_train shape:', X_train.shape)
print(X_train.shape[0], 'train samples')
print(X_test.shape[0], 'test samples')

# One-hot encode the labels
num_classes = max(y_train) + 1
oh_y_train = to_categorical(y_train, num_classes)
oh_y_test = to_categorical(y_test, num_classes)

X_train shape: (8982, 500)
8982 train samples
2246 test samples


  x_train, y_train = np.array(xs[:idx]), np.array(labels[:idx])
  x_test, y_test = np.array(xs[idx:]), np.array(labels[idx:])


In [3]:
num_classes

46

In [4]:
class LeeOscillator:
    def __init__(self, a=[1, 1, 1, 1, -1, -1, -1, -1], b=[0.6, 0.6, -0.5, 0.5, -0.6, -0.6, -0.5, 0.5], K=500, N=100):
        self.a = a
        self.b = b
        self.K = K
        self.N = N

    def _initialize_tensors(self, x, N):
        shape = tf.shape(x)
        rank = tf.rank(x)

        if rank == 2:
            x = tf.expand_dims(tf.expand_dims(x, axis=1), axis=1)
            shape = tf.shape(x)
        elif rank == 4 and shape[1] > 1:
            x = tf.transpose(x, [0, 2, 3, 1])
            shape = tf.shape(x)

        init_shape = tf.concat([[N], shape], axis=0)
        u = tf.zeros(init_shape, dtype=tf.float32)
        v = tf.zeros(init_shape, dtype=tf.float32)
        z = tf.zeros(init_shape, dtype=tf.float32)
        u = tf.tensor_scatter_nd_update(u, [[0]], [u[0] + 0.2])
        z = tf.tensor_scatter_nd_update(z, [[0]], [z[0] + 0.2])

        return u, v, z, x

    def Tanh(self, x):
        N = np.random.randint(1, self.N + 1)
        u, v, z, x = self._initialize_tensors(x, N)

        for t in range(N - 1):
            u = tf.tensor_scatter_nd_update(u, [[t + 1]], [
                tf.math.tanh(self.a[0] * u[t] - self.a[1] * v[t] + self.a[2] * z[t] + self.a[3] * x)])
            v = tf.tensor_scatter_nd_update(v, [[t + 1]], [
                tf.math.tanh(self.a[6] * z[t] - self.a[4] * u[t] - self.a[5] * v[t] + self.a[7] * x)])
            w = tf.math.tanh(x)
            z = tf.tensor_scatter_nd_update(z, [[t + 1]], [(v[t + 1] - u[t + 1]) * tf.math.exp(-self.K * tf.math.pow(x, 2)) + w])

        if tf.rank(x) == 4 and tf.shape(x)[1] > 1:
            z = tf.transpose(z[-1], [0, 3, 1, 2])

        return z[-1]

    def Sigmoid(self, x):
        N = np.random.randint(1, self.N + 1)
        u, v, z, x = self._initialize_tensors(x, N)

        for t in range(N - 1):
            u = tf.tensor_scatter_nd_update(u, [[t + 1]], [
                tf.math.sigmoid(self.b[0] * u[t] - self.b[1] * v[t] + self.b[2] * z[t] + self.b[3] * x)])
            v = tf.tensor_scatter_nd_update(v, [[t + 1]], [
                tf.math.sigmoid(self.b[6] * z[t] - self.b[4] * u[t] - self.b[5] * v[t] + self.b[7] * x)])
            w = tf.math.sigmoid(x)
            z = tf.tensor_scatter_nd_update(z, [[t + 1]], [(v[t + 1] - u[t + 1]) * tf.math.exp(-self.K * tf.math.pow(x, 2)) + w])

        if tf.rank(x) == 4 and tf.shape(x)[1] > 1:
            z = tf.transpose(z[-1], [0, 3, 1, 2])

        return z[-1]

    def ReLU(self, x):
        N = np.random.randint(1, self.N + 1)
        u, v, z, x = self._initialize_tensors(x, N)

        for t in range(N - 1):
            u = tf.tensor_scatter_nd_update(u, [[t + 1]], [
                tf.nn.relu(self.a[0] * u[t] - self.a[1] * v[t] + self.a[2] * z[t] + self.a[3] * x)])
            v = tf.tensor_scatter_nd_update(v, [[t + 1]], [
                tf.nn.relu(self.a[6] * z[t] - self.a[4] * u[t] - self.a[5] * v[t] + self.a[7] * x)])
            w = tf.nn.relu(x)
            z = tf.tensor_scatter_nd_update(z, [[t + 1]], [(v[t + 1] - u[t + 1]) * tf.math.exp(-self.K * tf.math.pow(x, 2)) + w])

        if tf.rank(x) == 4 and tf.shape(x)[1] > 1:
            z = tf.transpose(z[-1], [0, 3, 1, 2])

        return z[-1]

    def LeakyReLU(self, x, alpha=0.1):
        N = np.random.randint(1, self.N + 1)
        u, v, z, x = self._initialize_tensors(x, N)

        for t in range(N - 1):
            u = tf.tensor_scatter_nd_update(u, [[t + 1]], [
                tf.nn.leaky_relu(self.a[0] * u[t] - self.a[1] * v[t] + self.a[2] * z[t] + self.a[3] * x, alpha=alpha)])
            v = tf.tensor_scatter_nd_update(v, [[t + 1]], [
                tf.nn.leaky_relu(self.a[6] * z[t] - self.a[4] * u[t] - self.a[5] * v[t] + self.a[7] * x, alpha=alpha)])
            w = tf.nn.leaky_relu(x, alpha=alpha)
            z = tf.tensor_scatter_nd_update(z, [[t + 1]], [(v[t + 1] - u[t + 1]) * tf.math.exp(-self.K * tf.math.pow(x, 2)) + w])

        if tf.rank(x) == 4 and tf.shape(x)[1] > 1:
            z = tf.transpose(z[-1], [0, 3, 1, 2])

        return z[-1]

In [5]:
# 自定义激活层
class LeeOscillatorTanhLayer(Layer):
    def __init__(self, **kwargs):
        super(LeeOscillatorTanhLayer, self).__init__(**kwargs)
        self.oscillator = LeeOscillator()

    def call(self, inputs):
        return self.oscillator.Tanh(inputs)

class LeeOscillatorSigmoidLayer(Layer):
    def __init__(self, **kwargs):
        super(LeeOscillatorSigmoidLayer, self).__init__(**kwargs)
        self.oscillator = LeeOscillator()

    def call(self, inputs):
        return self.oscillator.Sigmoid(inputs)

class LeeOscillatorReLULayer(Layer):
    def __init__(self, **kwargs):
        super(LeeOscillatorReLULayer, self).__init__(**kwargs)
        self.oscillator = LeeOscillator()

    def call(self, inputs):
        return self.oscillator.ReLU(inputs)

class LeeOscillatorLeakyReLULayer(Layer):
    def __init__(self, **kwargs):
        super(LeeOscillatorLeakyReLULayer, self).__init__(**kwargs)
        self.oscillator = LeeOscillator()

    def call(self, inputs):
        return self.oscillator.LeakyReLU(inputs)

In [13]:
# Define the model with Embedding layer
model = Sequential()

model.add(Embedding(max_features, 128, input_length=max_len, name='embedding_1'))
model.add(SpatialDropout1D(0.2, name='spatial_dropout1d_1'))

model.add(Conv1D(32, 5, padding='same', name='conv1d_1'))
#model.add(LeeOscillatorReLULayer())
model.add(Activation("relu", name='activation_1'))
model.add(MaxPooling1D(pool_size=2, name='max_pooling1d_1'))

model.add(Conv1D(64, 3, padding='same', name='conv1d_2'))
#model.add(LeeOscillatorReLULayer())
model.add(Activation("relu", name='activation_2'))
model.add(MaxPooling1D(pool_size=2, name='max_pooling1d_2'))

model.add(Flatten(name='flatten_1'))
model.add(Dense(128, name='dense_1'))
#model.add(Activation("relu", name='activation_3'))
model.add(LeeOscillatorReLULayer())
model.add(Dropout(0.2, name='dropout_1'))
model.add(Dense(50, name='dense_2'))
#model.add(LeeOscillatorReLULayer())
model.add(Activation("relu", name='activation_4'))
model.add(Dropout(0.2, name='dropout_2'))
model.add(Dense(46, name='dense_3'))  # Output layer with 2 units (binary classification)
model.add(Activation("softmax", name='activation_5'))

model.compile(loss=keras.losses.categorical_crossentropy,
              optimizer=keras.optimizers.Adadelta(),
              metrics=['accuracy'])

# Train the model
model.fit(X_train, oh_y_train,
          batch_size=64,
          epochs=20,
          verbose=1,
          validation_data=(X_test, oh_y_test))

# Evaluate the model
scores = model.evaluate(X_train, oh_y_train)
print("Training Set:", "\n%s: %.2f%%" % (model.metrics_names[1], scores[1]*100))

scores = model.evaluate(X_test, oh_y_test)
print("Test Set:", "\n%s: %.2f%%" % (model.metrics_names[1], scores[1]*100))


Train on 8982 samples, validate on 2246 samples
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
Training Set: 
accuracy: 85.34%
Test Set: 
accuracy: 70.70%


In [6]:
# Decode the sequences back to words
word_index = reuters.get_word_index()
reverse_word_index = {v: k for k, v in word_index.items()}
decoded_X_train = [' '.join([reverse_word_index.get(i - 3, '?') for i in seq]) for seq in X_train]
decoded_X_test = [' '.join([reverse_word_index.get(i - 3, '?') for i in seq]) for seq in X_test]

In [7]:
# Vectorize the text data using TF-IDF
vectorizer = TfidfVectorizer(max_features=max_features)
knn_X_train = vectorizer.fit_transform(decoded_X_train)
knn_X_test = vectorizer.transform(decoded_X_test)

In [8]:
# Train final k-NN
knn_clf = KNeighborsClassifier(n_neighbors=1, algorithm="brute")
knn_clf.fit(knn_X_train, y_train)

KNeighborsClassifier(algorithm='brute', n_neighbors=1)

In [9]:
# Check the accuracy on this particular split to make sure that it is not too far removed from k-fold.
knn_predictions_test = knn_clf.predict(knn_X_test)
print("k-NN Accuracy Test:", accuracy_score(y_test, knn_predictions_test))

k-NN Accuracy Test: 0.7537845057880677


In [11]:
# Check confusion matrix kNN
confusion_matrix(y_test, knn_predictions_test, labels=None, sample_weight=None)

array([[ 8,  1,  0, ...,  0,  0,  0],
       [ 0, 77,  1, ...,  0,  0,  0],
       [ 0,  1, 13, ...,  0,  0,  0],
       ...,
       [ 0,  0,  0, ...,  6,  0,  0],
       [ 0,  0,  0, ...,  0,  4,  0],
       [ 0,  0,  0, ...,  0,  0,  1]], dtype=int64)

In [12]:
# Check confusion matrix NN
confusion_matrix(y_test, model.predict_classes(X_test), labels=None, sample_weight=None)

array([[ 5,  1,  0, ...,  0,  0,  0],
       [ 0, 74,  0, ...,  0,  0,  0],
       [ 0,  7,  4, ...,  0,  0,  0],
       ...,
       [ 0,  0,  0, ...,  0,  0,  0],
       [ 0,  0,  3, ...,  0,  0,  0],
       [ 0,  0,  0, ...,  0,  0,  0]], dtype=int64)

In [13]:
# Save the CBR model to disk
pickle.dump(knn_clf, open('k-nn_model.sav', 'wb'))

In [14]:
# Save Keras Models to disk
model.save("NN.h5")

In [15]:
# Save Dataframes
np.save("X_train", X_train)
np.save("X_test", X_test)
np.save("y_train", y_train)
np.save("y_test", y_test)

np.save("knn_X_train", knn_X_train)
np.save("knn_X_test", knn_X_test)

np.save("oh_y_train", oh_y_train)
np.save("oh_y_test", oh_y_test)