In [1]:
import numpy as np 
import math
import random as rd
from sklearn.model_selection import train_test_split
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, ModelCheckpoint, LearningRateScheduler
def create_data(n_rows, n_cols,choice):
    x_sample=[]
    col=[]
    while len(x_sample)<n_rows:
        if choice:
            col=[rd.randint(-1000,1000) for _ in range(n_cols)]
        else:
            col=[rd.choice([0,1]) for _ in range(n_cols)]
        x_sample.append(col)
    x_sample= np.array(x_sample)
    return x_sample

x = create_data(1000, 42,True)
y= create_data(1000, 1,False)


In [2]:
x_train = x[:700]
x_test=x[700:]
y_train=y[:700]
y_test=y[700:]
print(x_train.shape,y_train.shape,x_test.shape,y_test.shape)

(700, 42) (700, 1) (300, 42) (300, 1)


In [3]:
from keras.layers import Input, Conv1D, BatchNormalization, Activation, Dropout, Reshape, GlobalAveragePooling1D
from keras.models import Model
from keras.layers import Layer
from keras import backend as K


class DepthwiseConv1D(Layer):
    def __init__(self, kernel_size, depth_multiplier=1, strides=1, padding='valid', **kwargs):
        super(DepthwiseConv1D, self).__init__(**kwargs)
        self.kernel_size = kernel_size
        self.depth_multiplier = depth_multiplier
        self.strides = strides
        self.padding = padding

    def build(self, input_shape):
        if len(input_shape) != 3:
            raise ValueError('Input shape should be a tuple of 3 dimensions.')
        input_dim = input_shape[2]
        depthwise_kernel_shape = (self.kernel_size, input_dim, self.depth_multiplier)

        self.depthwise_kernel = self.add_weight(shape=depthwise_kernel_shape,
                                                initializer='glorot_uniform',
                                                name='depthwise_kernel')

        self.built = True

    def compute_output_shape(self, input_shape):
        length = input_shape[1]
        if self.padding == 'valid':
            length -= self.kernel_size - 1
        length = (length + self.strides - 1) // self.strides

        return (input_shape[0], length, input_shape[2] * self.depth_multiplier)

def relu6(x):
    return K.relu(x, max_value=6)

def preprocess_input(x):
    x /= 255.
    x -= 0.5
    x *= 2.
    return x

def MobileNet(input_shape=None, alpha=1.0, depth_multiplier=1, dropout=1e-3, classes=1):
    input_shape = (42,1)
    x_input = Input(shape=input_shape)
    print(x_input.shape)
    x = _conv_block(x_input, 32, alpha, strides=2)
    x = _depthwise_conv_block(x, 64, alpha, depth_multiplier, block_id=1)

    x = _depthwise_conv_block(x, 128, alpha, depth_multiplier, strides=2, block_id=2)
    x = _depthwise_conv_block(x, 128, alpha, depth_multiplier, block_id=3)

    x = _depthwise_conv_block(x, 256, alpha, depth_multiplier, strides=2, block_id=4)
    x = _depthwise_conv_block(x, 256, alpha, depth_multiplier, block_id=5)

    x = _depthwise_conv_block(x, 512, alpha, depth_multiplier, strides=2, block_id=6)
    x = _depthwise_conv_block(x, 512, alpha, depth_multiplier, block_id=7)
    x = _depthwise_conv_block(x, 512, alpha, depth_multiplier, block_id=8)
    x = _depthwise_conv_block(x, 512, alpha, depth_multiplier, block_id=9)
    x = _depthwise_conv_block(x, 512, alpha, depth_multiplier, block_id=10)
    x = _depthwise_conv_block(x, 512, alpha, depth_multiplier, block_id=11)

    x = _depthwise_conv_block(x, 1024, alpha, depth_multiplier, strides=2, block_id=12)
    x = _depthwise_conv_block(x, 1024, alpha, depth_multiplier, block_id=13)

    x = GlobalAveragePooling1D()(x)
    x = Dropout(dropout, name='dropout')(x)
    x = Reshape((1, 1024))(x)
    print(x.shape)
    x = Conv1D(classes, 1, padding='same', name='conv_preds')(x)
    print(x.shape)
    x = Reshape((1,1), name='reshape_2')(x)
    x = Activation('softmax', name='act_softmax')(x)
    model = Model(inputs=x_input, outputs=x, name='mobilenet')
    return model

def _conv_block(inputs, filters, alpha, kernel=3, strides=1):
    channel_axis = 1
    filters = int(filters * alpha)
    x = Conv1D(filters, kernel,
               padding='same',
               use_bias=False,
               strides=strides,
               name='conv1')(inputs)
    x = BatchNormalization(axis=channel_axis, name='conv1_bn')(x)
    return Activation('relu', name='conv1_relu')(x)

def _depthwise_conv_block(inputs, pointwise_conv_filters, alpha,
                          depth_multiplier=1, strides=1, block_id=1):
    channel_axis = 1
    pointwise_conv_filters = int(pointwise_conv_filters * alpha)

    x = DepthwiseConv1D(3,
                        padding='same',
                        depth_multiplier=depth_multiplier,
                        strides=strides,
                        name='conv_dw_%d' % block_id)(inputs)
    x = BatchNormalization(axis=channel_axis, name='conv_dw_%d_bn' % block_id)(x)
    x = Activation(relu6, name='conv_dw_%d_relu' % block_id)(x)

    x = Conv1D(pointwise_conv_filters, 1,
               padding='same',
               use_bias=False,
               strides=1,
               name='conv_pw_%d' % block_id)(x)
    x = BatchNormalization(axis=channel_axis, name='conv_pw_%d_bn' % block_id)(x)
    return Activation('relu', name='conv_pw_%d_relu' % block_id)(x)

model = MobileNet()
lr=0.0001
model.compile(optimizer=Adam(lr=lr), loss='binary_crossentropy', metrics=['accuracy'])
model.summary()


(None, 42, 1)
(None, 1, 1024)
(None, 1, 1)
Model: "mobilenet"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 42, 1)]           0         
                                                                 
 conv1 (Conv1D)              (None, 21, 32)            96        
                                                                 
 conv1_bn (BatchNormalizatio  (None, 21, 32)           84        
 n)                                                              
                                                                 
 conv1_relu (Activation)     (None, 21, 32)            0         
                                                                 
 conv_dw_1 (DepthwiseConv1D)  (None, 21, 32)           96        
                                                                 
 conv_dw_1_bn (BatchNormaliz  (None, 21, 32)           84        
 ation)       

 conv_dw_9 (DepthwiseConv1D)  (None, 21, 512)          1536      
                                                                 
 conv_dw_9_bn (BatchNormaliz  (None, 21, 512)          84        
 ation)                                                          
                                                                 
 conv_dw_9_relu (Activation)  (None, 21, 512)          0         
                                                                 
 conv_pw_9 (Conv1D)          (None, 21, 512)           262144    
                                                                 
 conv_pw_9_bn (BatchNormaliz  (None, 21, 512)          84        
 ation)                                                          
                                                                 
 conv_pw_9_relu (Activation)  (None, 21, 512)          0         
                                                                 
 conv_dw_10 (DepthwiseConv1D  (None, 21, 512)          1536      
 )        

  super().__init__(name, **kwargs)


In [4]:
earlystopping = EarlyStopping(monitor = 'val_loss', mode = 'min', verbose = 1, patience = 30)

early_stopping = EarlyStopping(monitor='val_accuracy',
    min_delta=0.00005,
    patience=11,
    verbose=1,
    restore_best_weights=True,
)

lr_scheduler = ReduceLROnPlateau(
    monitor='val_accuracy',
    factor=0.5,
    patience=30,
    min_lr=0.000001,
    verbose=1,
)
from tensorflow.keras.callbacks import Callback
from sklearn.metrics import confusion_matrix

class ConfusionMatrixCallback(Callback):
    def __init__(self, validation_data):
        self.validation_data = validation_data
        self.relist={}
        self.acc=[]

    def on_epoch_end(self, epoch, logs=None):
        X_val, y_val = self.validation_data
        y_pred = self.model.predict(X_val)
        y_pred_binary = (y_pred > 0.5).astype(int)

        cm = confusion_matrix(y_val, y_pred_binary)
        print(f"Confusion Matrix after Epoch {epoch + 1}:\n{cm}")
        print(classification_report(y_test,y_pred_binary))
        acc = float(classification_report(y_test,y_pred_binary).split('\n')[3].split()[2])
        self.acc.append(acc)
        self.relist[acc]=y_pred_binary

# ... Định nghĩa mô hình ...

# Tạo đối tượng callback
confusion_matrix_callback = ConfusionMatrixCallback(validation_data=(x_test,y_test))

callbacks = [early_stopping,lr_scheduler]

checkpointer = ModelCheckpoint(filepath = "Emotion_weights.hdf5", verbose = 1, save_best_only=True)

history = model.fit(x=x_train, y=y_train)



