In [None]:
import sqlite3
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.utils import Sequence
from tensorflow.keras.models import Sequential
from tensorflow.keras import layers, models
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout



# Kết nối database SQLite
DB_PATH = "C:/Users/hoang/Data_Train_2025/database_labeled_all.db"
TABLE_NAME = "one_hot_encoded_data"
BATCH_SIZE = 64
TRAIN_RATIO = 0.8 

# conn = sqlite3.connect(DB_PATH)
# cursor = conn.cursor()
# query = "select * from  limit 5;"

# df1= pd.read_sql_query(query, conn)
# print (df1)

#Dùng generator
class SQLiteDataGenerator(Sequence):
    def __init__(self, db_path, table_name, batch_size=32, mode="train"):
        self.db_path = db_path
        self.table_name = table_name
        self.batch_size = batch_size
        self.mode = mode

        # Lấy số lượng mẫu và chia train/test
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute(f"SELECT COUNT(*) FROM {self.table_name}")
        self.total_samples = cursor.fetchone()[0]
        self.train_samples = int(self.total_samples * TRAIN_RATIO)
        conn.close()

    def __len__(self):
        if self.mode == "train":
            return int(np.ceil(self.train_samples / self.batch_size))
        else:
            return int(np.ceil((self.total_samples - self.train_samples) / self.batch_size))

    def __getitem__(self, index):
        offset = index * self.batch_size
        limit = self.batch_size

        if self.mode == "train":
            query = f"SELECT * FROM {self.table_name} LIMIT {limit} OFFSET {offset}"
        else:
            query = f"SELECT * FROM {self.table_name} LIMIT {limit} OFFSET {self.train_samples + offset}"

        conn = sqlite3.connect(self.db_path)
        batch_df = pd.read_sql_query(query, conn)
        conn.close()

        # Chuẩn bị dữ liệu
        X = batch_df.drop(columns=["label"]).values.reshape(-1, 46, 1)
        y = tf.keras.utils.to_categorical(batch_df["label"], num_classes=10)  # One-hot encode labels
        return X, y


In [None]:
train_generator = SQLiteDataGenerator(DB_PATH, TABLE_NAME, batch_size=BATCH_SIZE, mode="train")
test_generator = SQLiteDataGenerator(DB_PATH, TABLE_NAME, batch_size=BATCH_SIZE, mode="test")

from tensorflow import keras
# Định nghĩa mô hình CNN

model = keras.Sequential([
    layers.Conv1D(filters=32, kernel_size=3, padding="same",activation="relu",input_shape = (46, 1)),
    layers.MaxPooling1D(pool_size=2),
    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0,5),
    layers.BatchNormalization(),
    layers.Dense(10, activation='softmax')
])

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
model.fit(train_generator, validation_data=test_generator, epochs=10, batch_size=64)

# Lưu mô hình
model.save("cnn_model.h5")

Epoch 1/10
   2451/1165872 [..............................] - ETA: 961:50:18 - loss: 0.7396 - accuracy: 0.7605

In [None]:

# #Chia dữ liệu thành tập huấn luyện và tập kiểm tra, ví dụ chia tỉ lệ 80-20
# X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size=0.2, random_state=42)

# # Ở đây:
# # - X là ma trận hoặc DataFrame chứa các đặc trưng/hồ sơ
# # - y là mảng hoặc Series chứa nhãn hoặc biến mục tiêu
# # - test_size là tỷ lệ dữ liệu sẽ được chia thành tập kiểm tra (ở đây là 20%)
# # - random_state là seed để tạo số ngẫu nhiên, giúp kết quả có thể tái tạo được

# # In ra kích thước của các tập dữ liệu
# print("Kích thước tập huấn luyện X:", X_train.shape)
# print("Kích thước tập kiểm tra X:", X_test.shape)
# print("Kích thước tập huấn luyện y:", y_train.shape)
# print("Kích thước tập kiểm tra y:", y_test.shape)


# import tensorflow as tf
# from tensorflow.keras.models import Sequential
# from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout

# # Số lượng lớp (dựa vào y_encoded)
# num_classes = y_encoded.shape[1]  

# # Xây dựng mô hình CNN cho dữ liệu số
# model = Sequential([
#     Conv1D(32, kernel_size=3, activation='relu', input_shape=(X_train.shape[1], 1)),  
#     MaxPooling1D(pool_size=2),
#     Conv1D(64, kernel_size=3, activation='relu'),
#     MaxPooling1D(pool_size=2),
#     Flatten(),
#     Dense(128, activation='relu'),
#     Dropout(0.5),
#     Dense(num_classes, activation='softmax')
# ])

# # Compile model
# model.compile(optimizer='adam', 
#               loss='categorical_crossentropy',  
#               metrics=['accuracy'])

# # Hiển thị mô hình
# model.summary()


model.fit(X_train, y_train, epochs=10, batch_size=32, validation_split=0.2)


pred = model.predict(X_test)
y_pred= np.argmax(pred, axis = 1)
# y_pred = np.array(y_pred)

# num_classes = len(label_mapping)
# y_pred = to_categorical(y_pred, num_classes)

print(y_pred)
y_test_to_argmax = np.argmax(y_test, axis=1)
from sklearn.metrics import confusion_matrix,accuracy_score
from sklearn.metrics import (precision_score, recall_score,
                             f1_score, accuracy_score,mean_squared_error,mean_absolute_error)
confusion_matrix(y_test_to_argmax, y_pred)


accuracy =accuracy_score(y_test_to_argmax, y_pred)*100
print(accuracy)

recall = recall_score(y_test_to_argmax, y_pred , average="weighted")
precision = precision_score(y_test_to_argmax, y_pred , average="weighted")
f1 = f1_score(y_test_to_argmax, y_pred, average="weighted")

print("accuracy")
print("%.3f" %accuracy)
print("racall")
print("%.3f" %recall)
print("precision")
print("%.3f" %precision)
print("f1score")
print(f1)