In [None]:
import cv2
import os
import numpy as np
import pandas as pd
from scipy.fftpack import dct  # For Discrete Cosine Transform

# Paths to your folders
mfcc_folder = "../Covid_19 Project/mfcc"
chroma_folder = "../Covid_19 Project/chroma"
mel_folder = "../Covid_19 Project/mel"

# Output CSV files
mfcc_csv = "mfcc.csv"
chroma_csv = "chroma.csv"
mel_csv = "mel.csv"

# Resize all images to a fixed size
fixed_size = (128, 128)  # Ensure consistent dimensions

# Number of coefficients to keep
num_coefficients = 40  # Adjust based on your needs


def extract_dct_features(folder, fixed_size, num_coefficients):
    """Process images and extract DCT coefficients."""
    data = []
    filenames = []
    for filename in os.listdir(folder):
        file_path = os.path.join(folder, filename)
        if os.path.isfile(file_path) and filename.endswith(('.png', '.jpg', '.jpeg')):
            # Read image as grayscale
            img = cv2.imread(file_path, cv2.IMREAD_GRAYSCALE)
            # Resize to fixed size
            img_resized = cv2.resize(img, fixed_size)
            # Apply DCT
            img_dct = dct(dct(img_resized.T, norm='ortho').T, norm='ortho')  # 2D DCT
            # Flatten and select top coefficients
            dct_flattened = img_dct.flatten()[:num_coefficients]
            data.append(dct_flattened)
            filenames.append(filename)  # Store the filename
    return np.array(data), filenames


# Process each folder and extract features
mfcc_data, mfcc_filenames = extract_dct_features(mfcc_folder, fixed_size, num_coefficients)
chroma_data, chroma_filenames = extract_dct_features(chroma_folder, fixed_size, num_coefficients)
mel_data, mel_filenames = extract_dct_features(mel_folder, fixed_size, num_coefficients)

# Save data to CSV files, including filenames
mfcc_df = pd.DataFrame(mfcc_data, columns=[f"coef_{i+1}" for i in range(num_coefficients)])
mfcc_df.insert(0, "filename", mfcc_filenames)
mfcc_df.to_csv(mfcc_csv, index=False)

chroma_df = pd.DataFrame(chroma_data, columns=[f"coef_{i+1}" for i in range(num_coefficients)])
chroma_df.insert(0, "filename", chroma_filenames)
chroma_df.to_csv(chroma_csv, index=False)

mel_df = pd.DataFrame(mel_data, columns=[f"coef_{i+1}" for i in range(num_coefficients)])
mel_df.insert(0, "filename", mel_filenames)
mel_df.to_csv(mel_csv, index=False)

print("Coefficient data with filenames saved to CSV files successfully!")

In [None]:
from PIL import Image

# Open the image
image = Image.open('../Covid_19 Project/mel/0b2f75d7-f116-4f35-ae4c-f2018eab2794.png')

# Get the dimensions of the image
width, height = image.size  # PIL returns width and height, no channels directly

# Print the size
print(f"Width: {width}, Height: {height}")

In [None]:
import pandas as pd

# Step 1: Read feature files
mel_mean_feature = pd.read_csv('../Covid_19 Project/mel.csv')
mfcc_mean_feature = pd.read_csv('../Covid_19 Project/mfcc.csv')
chroma_mean_feature = pd.read_csv('../Covid_19 Project/chroma.csv')

# Step 2: Concatenate features while excluding the first column (assumed to be a non-feature column)
data2 = pd.concat([
    mfcc_mean_feature.iloc[:, 1:],  # Exclude the first column (index)
    chroma_mean_feature.iloc[:, 1:],  # Exclude the first column
    mel_mean_feature.iloc[:, 1:-1]  # Exclude the first column
], axis=1)

# Step 3: Print the shape and preview of the concatenated features
print(f"Data shape after concatenation: {data2.shape}")
print(f"First 4 rows of data:\n {data2.head(4)}")

# Step 4: Load label file and convert to binary
metadata_df = pd.read_csv('../Covid_19 Project/mel.csv')

# Extract the label column (assumed to be the last column)
labels = metadata_df.iloc[:, -1]

# Convert labels to binary format (e.g., 'healthy' -> 0, 'COVID' -> 1)
labels_binary = labels.map({'healthy': 0, 'COVID-19': 1})

# Step 5: Add binary labels to the concatenated data
data2['label'] = labels_binary.values

# Step 6: Save the combined data with binary labels
output_file = '../Covid_19 Project/combined_features_with_binary_labels.csv'
data2.to_csv(output_file, index=False)

# Final confirmation
print(f"Combined data with binary labels has been saved to {output_file}.")
print(f"Final data shape: {data2.shape}")


In [None]:
from sklearn.preprocessing import LabelEncoder, StandardScaler,MinMaxScaler,scale
from sklearn.model_selection import GridSearchCV, train_test_split, RepeatedStratifiedKFold, cross_val_score, KFold,StratifiedKFold 
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score,roc_curve,roc_auc_score, auc
from sklearn.decomposition import PCA
from sklearn import metrics
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC


genre_list = data2.iloc[:, -1]
#print ('genre_list\n',genre_list)
encoder = LabelEncoder()
y = encoder.fit_transform(genre_list) #Gán nhãn 0,1 cho class. Có thể nói là đưa về one hot coding
neg, pos = np.bincount(y)
total = neg + pos
print ('positive: {} ({:.2f}% of total) \nnegative cases: {}'.format(pos, 100 * pos/total ,neg)) 

In [None]:
scaler = StandardScaler()
print ('X before scaling:\n',np.array(data2.iloc[:, :-1]))
X = scaler.fit_transform(np.array(data2.iloc[:, :-1], dtype = float)) #không scale 2 cột file name, label
print ('\nX after scaling:\n',X,'\nX.shape', X.shape)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle = True, 
                                                    random_state = None, stratify = y)
#print (y_test)
print (len(y_test))

In [None]:
print('X_train.shape:',X_train.shape)
print('\nX_train.shape[1]:',X_train.shape[1])
print ('\ny_train.shape:',y_train.shape)

In [None]:
#Keras
import keras
from keras import models
from keras import layers
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, Flatten, Dense, MaxPool2D, Dropout
from tensorflow.keras.utils import to_categorical 


from imblearn.pipeline import Pipeline
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler

import xgboost as xgb
import joblib

import seaborn as sns

import warnings
warnings.filterwarnings('ignore')

def get_model():
    model = models.Sequential()
    model.add(layers.Dense(512, activation='relu', input_shape=(X_train.shape[1],))) #Đầu vào đã được transpose

    model.add(layers.Dense(256, activation='relu'))

    model.add(Dropout(0.2))

    model.add(layers.Dense(128, activation='relu'))

    model.add(layers.Dense(64, activation='relu'))

    model.add(Dropout(0.2))

    model.add(layers.Dense(10, activation='relu'))

    model.add(layers.Dense(2, activation='softmax'))
    
    model.compile(optimizer= keras.optimizers.Adam(learning_rate=0.001),
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])
    
    return model
#Model này có dấu hiệu over fitting nên cho drop out

# plot model
model = get_model()
model.summary()

In [None]:
batch_size = 16
early_stopping_patience = 10

# Add early stopping

my_callbacks = [
    tf.keras.callbacks.ModelCheckpoint(filepath='./model_{epoch:02d}.h5', 
                                       save_freq='epoch', 
                                       save_best_only=True,
                                       period = 10),
    tf.keras.callbacks.EarlyStopping(
    monitor="val_loss", patience=early_stopping_patience, restore_best_weights=True
    )
]



history = model.fit(X_train, y_train,
                    epochs=100,
                    batch_size=batch_size,
                    callbacks = my_callbacks,
                    validation_split=0.15)

In [None]:
import librosa
import librosa, librosa.display, os, csv
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import pyplot
%matplotlib inline
import os
from PIL import Image
import pathlib
import csv

def history_loss_acc(history,name):
    # list all data in history
    print(history.history.keys())
    
    # summarize history for accuracy
    
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('model accuracy_'+name)
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'test'], loc='upper left')
    plt.show()
    # summarize history for loss
    
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('model loss_'+name)
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'test'], loc='upper left')
    plt.show()

history_loss_acc(history, 'Original data')

In [None]:
# serialize model to JSON
model_json = model.to_json()
with open("model.json", "w") as json_file:
    json_file.write(model_json)
# serialize weights to HDF5
model.save_weights("model.h5")
print("Saved model to disk")

In [None]:
test_loss, test_acc = model.evaluate(X_test,y_test) 

In [None]:
print('test_acc: ',test_acc)

In [None]:
predictions = model.predict(X_test)
#print ('so predict:',len(predictions))
#print('\npredictions[0].shape',predictions[0].shape)
#print('\nnp.sum(predictions)',np.sum(predictions[0]))
#print('\npredictions[:4]\n',predictions[:4])
#print('\ny_test',y_test[:4])
y_predict =[]
for i in range(len(predictions)):
    predict = np.argmax(predictions[i])
    y_predict.append(predict)
#predict = np.argmax(predictions[4])
#print ('predict\n',y_predict)

In [None]:
def evaluate_matrix(y_test, y_predict, name):
    cm = confusion_matrix(y_test, y_predict)
    cm_df = pd.DataFrame(cm, index=["Negative", "Positive"], columns=["Negative", "Positive"])

    plt.figure(figsize=(10, 10))

    sns.set(font_scale=1)

    ax = sns.heatmap(cm_df, annot=True, square=True, fmt='d', linewidths=.2, cbar=0, cmap=plt.cm.Blues)
    ax.set_xticklabels(ax.get_xticklabels(), rotation=0)

    plt.ylabel("True labels")
    plt.xlabel("Predicted labels")
    plt.tight_layout()
    plt.title(name)

    plt.show()

    print(classification_report(y_test, y_predict, target_names=["Negative", "Positive"]))
    
evaluate_matrix(y_test, y_predict, 'Original model')

In [None]:
# summarize score
#print(predictions[:,1], '\n',predictions[:,1].shape )
def ROC_curve(y_test,predictions,name):
    
    # calculate roc curves
    lr_fpr, lr_tpr, _ = roc_curve(y_test, predictions[:,1])
    print ('model: {} \nAUC = {}'. format(name, auc(lr_fpr, lr_tpr)))
    # plot the roc curve for the model
    lw = 2
    plt.plot(lr_fpr, lr_tpr, color="darkorange",
             lw=lw, label="ROC curve (area = %0.2f)" % auc(lr_fpr, lr_tpr))
    plt.plot([0, 1], [0, 1], color="navy", lw=lw, linestyle="--")
    plt.xlim([-0.02, 1.0])
    plt.ylim([0.0, 1.05])
    # axis labels
    pyplot.xlabel('False Positive Rate')
    pyplot.ylabel('True Positive Rate')
    plt.title(name)
    # show the legend
    pyplot.legend()
    # show the plot
    pyplot.show()

ROC_curve(y_test,predictions,'Original data')

In [None]:
print ('original positive cases: {}  and  total cases: {}'.format(pos, total) )
# transform the dataset
oversample = SMOTE(sampling_strategy=0.8, k_neighbors=5) #pos is equal to 50% neg
X_os, y_os = oversample.fit_resample(X_train, y_train)

order = np.arange(len(y_os))
np.random.shuffle(order)
X_os = X_os[order]
y_os = y_os[order]

neg_os, pos_os = np.bincount(y_os)
total_os = neg_os + pos_os
print ('\nAfter oversampling \nnegative cases: {}  \npositive cases: {} ({:.2f}% of total)'.format(neg_os, pos_os, 100 * pos_os/total_os )) 

In [None]:
history_os = model.fit(X_os, y_os,
                    epochs=100,
                    batch_size=batch_size,
                    callbacks = my_callbacks,
                    validation_split=0.15)

In [None]:
history_loss_acc(history,'Original Data')
history_loss_acc(history_os,'Oversampling Data')

In [None]:
test_loss_os, test_acc_os = model.evaluate(X_test,y_test) 
print ('test_acc_os',test_acc_os)
predictions_os = model.predict(X_test)
y_predict_os =[]
for i in range(len(predictions_os)):
    predict = np.argmax(predictions_os[i])
    y_predict_os.append(predict)
#predict = np.argmax(predictions[4])
#print ('predict_os\n',y_predict_os)

In [None]:
## predictions = np.array([1 if x >= 0.5 else 0 for x in seed_final_test])
evaluate_matrix(y_test, y_predict,'Original data')
evaluate_matrix(y_test, y_predict_os, 'Oversampling Data')

In [None]:
ROC_curve(y_test, predictions,'Original data')
ROC_curve(y_test, predictions_os, 'Oversampling Data')

In [None]:
accuracy_list = []
loss_list = []

# K-Fold CV
kfold = StratifiedKFold(n_splits=5, shuffle=True)
# We should use Stratified KFold for binary cassification & huge class imbalance

# K-fold Cross Validation model evaluation
fold_idx = 1

for train_ids, val_ids in kfold.split(X_os, y_os):
    
    model = get_model()

    print("\nBắt đầu train Fold ", fold_idx)

    # Train model
    model.fit(X_os[train_ids], y_os[train_ids],
              batch_size=16,
              epochs=25,
              callbacks = my_callbacks,
              verbose=1)


    # Test và in kết quả
    scores = model.evaluate(X_os[val_ids], y_os[val_ids], verbose=0)
    print("Đã train xong Fold ", fold_idx)
    print(f'> Fold {fold_idx} - Loss: {scores[0]} - Accuracy: {100* scores[1]}%')
    
    # Thêm thông tin accuracy và loss vào list
    accuracy_list.append(scores[1] * 100)
    loss_list.append(scores[0])

    # To the next fold
    fold_idx = fold_idx + 1

In [None]:
# In kết quả tổng thể
print('* Chi tiết các fold')
for i in range(0, len(accuracy_list)):
    print(f'> Fold {i+1} - Loss: {loss_list[i]} - Accuracy: {accuracy_list[i]}%')

print('* Đánh giá tổng thể các folds:')
print(f'> Accuracy: {np.mean(accuracy_list)} (Độ lệch +- {np.std(accuracy_list)})')
print(f'> Loss: {np.mean(loss_list)}')

In [None]:
# Train model
history_cv = model.fit(X_os, y_os,
          batch_size=16,
          epochs=100,
          callbacks = my_callbacks,
          verbose=1)
model.save('./kfold.h5')

# load model and predict
loaded_model = get_model()
loaded_model.load_weights('./kfold.h5')
predictions_cv = loaded_model.predict(X_test)
y_predict_cv =[]
for i in range(len(predictions_cv)):
    predict = np.argmax(predictions_cv[i])
    y_predict_cv.append(predict)
#predict = np.argmax(predictions[4])
#print ('predict_cv\n',y_predict_cv)

In [None]:
## predictions = np.array([1 if x >= 0.5 else 0 for x in seed_final_test])
evaluate_matrix(y_test, y_predict,'Original data')
evaluate_matrix(y_test, y_predict_os, 'Oversampling Data')
evaluate_matrix(y_test, y_predict_cv, 'Oversmap & K_fold')
# summarize score
ROC_curve(y_test, predictions,'Original data')
ROC_curve(y_test, predictions_os, 'Oversampling Data')
ROC_curve(y_test, predictions_cv, 'Oversamp & K_fold')

#print('y_test ',y_test)
#print('y_predict_origin_data ',y_predict)
#print('y_predict_os ',y_predict_os)
#print('y_predict_cv ',y_predict_cv)