In [None]:
import os
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.metrics import (
    accuracy_score,
    auc,
    average_precision_score,
    balanced_accuracy_score,
    classification_report,
    confusion_matrix,
    precision_recall_curve,
    roc_auc_score,
    roc_curve,
)
from sklearn.model_selection import (
    cross_val_predict,
    cross_val_score,
    train_test_split,
    KFold,
)
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import FunctionTransformer, RobustScaler, StandardScaler

In [None]:
project_root = "/Users/feiyixie/Projects/Summer-2024-ECE-597-Group8"
data_path = os.path.join(project_root, "data", "processed", "features_bow_labels.csv")
model_path = os.path.join(project_root, 'data', 'models', 'best_cnn_model_bow.keras')
random_state = 42
test_size = 0.1

In [None]:
df = pd.read_csv(data_path)
df = df.drop(columns=["js_code"])

In [None]:
def log_transform(x):
    return np.log1p(x)

In [None]:
pipeline_log_transform_StandardScaler = Pipeline(
    steps=[
        ("log_transform", FunctionTransformer(log_transform)),
        ("StandardScaler", StandardScaler()),
    ]
)

pipeline_log_transform_RobustScaler = Pipeline(
    steps=[
        ("log_transform", FunctionTransformer(log_transform)),
        ("RobustScaler", RobustScaler()),
    ]
)

features_preprocessor = ColumnTransformer(
    transformers=[
        (
            "features_processer",
            pipeline_log_transform_RobustScaler,
            [
                "Word_Count",
                "Homoglyphs",
                "Total_Abnormal_Count",
                "html_tags",
            ],
        ),
        (
            "bow_processer",
            pipeline_log_transform_StandardScaler,
            [col for col in df.columns if col.startswith('BoW_')]
        ),
    ],
    remainder="passthrough",
)

In [None]:
X = features_preprocessor.fit_transform(df.drop(columns=["Label"]))
y = df["Label"].to_numpy()

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=test_size, random_state=random_state
)

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout, Input
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import f1_score
import optuna
from tensorflow.keras import backend as K

# Reshape the data for CNN
X_train_reshaped = X_train.reshape((X_train.shape[0], X_train.shape[1], 1))

# Define a custom F1-score metric
def f1_score_metric(y_true, y_pred):
    y_true = K.cast(y_true, 'float32')
    y_pred = K.round(K.cast(y_pred, 'float32'))
    tp = K.sum(K.cast(y_true * y_pred, 'float32'), axis=0)
    tn = K.sum(K.cast((1 - y_true) * (1 - y_pred), 'float32'), axis=0)
    fp = K.sum(K.cast((1 - y_true) * y_pred, 'float32'), axis=0)
    fn = K.sum(K.cast(y_true * (1 - y_pred), 'float32'), axis=0)

    p = tp / (tp + fp + K.epsilon())
    r = tp / (tp + fn + K.epsilon())
    
    f1 = 2 * p * r / (p + r + K.epsilon())
    f1 = K.mean(f1)
    return f1

# Define the model creation function
def create_model(trial):
    model = Sequential()
    model.add(Input(shape=(X_train_reshaped.shape[1], 1)))
    model.add(Conv1D(filters=trial.suggest_int('filters', 16, 128), 
                     kernel_size=trial.suggest_int('kernel_size', 2, 5), 
                     activation='relu'))
    model.add(MaxPooling1D(pool_size=trial.suggest_int('pool_size', 2, 4)))
    model.add(Conv1D(filters=trial.suggest_int('filters2', 16, 128), 
                     kernel_size=trial.suggest_int('kernel_size2', 2, 5), 
                     activation='relu'))
    model.add(MaxPooling1D(pool_size=trial.suggest_int('pool_size2', 2, 4)))
    model.add(Flatten())
    model.add(Dense(units=trial.suggest_int('units', 32, 256), activation='relu'))
    model.add(Dropout(rate=trial.suggest_float('dropout_rate', 0.2, 0.5)))
    model.add(Dense(1, activation='sigmoid'))
    
    model.compile(optimizer=trial.suggest_categorical('optimizer', ['adam', 'rmsprop']),
                  loss='binary_crossentropy',
                  metrics=[f1_score_metric])
    return model

# Define the objective function for Optuna
def objective(trial):
    skf = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)
    f1_scores = []

    for train_index, valid_index in skf.split(X_train_reshaped, y_train):
        X_t, X_v = X_train_reshaped[train_index], X_train_reshaped[valid_index]
        y_t, y_v = y_train[train_index], y_train[valid_index]

        model = create_model(trial)
        early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)
        model.fit(X_t, y_t, 
                  epochs=trial.suggest_int('epochs', 10, 50), 
                  batch_size=trial.suggest_int('batch_size', 32, 256),
                  validation_data=(X_v, y_v), 
                  callbacks=[early_stopping], 
                  verbose=0)
        
        y_pred = (model.predict(X_v) > 0.5).astype("int32")
        f1 = f1_score(y_v, y_pred, pos_label=1)
        f1_scores.append(f1)
    
    return np.mean(f1_scores)

# Create an Optuna study and optimize the objective function
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=50, n_jobs=6)

# Print the best parameters
print('Best trial:')
trial = study.best_trial
print(f'Value: {trial.value}')
print('Params:')
for key, value in trial.params.items():
    print(f'    {key}: {value}')

In [None]:
# Ensure the directory exists
os.makedirs(os.path.dirname(model_path), exist_ok=True)

# Create the model with the best parameters from Optuna
best_params = study.best_trial.params

model = Sequential()
model.add(Input(shape=(X_train_reshaped.shape[1], 1)))
model.add(Conv1D(filters=best_params['filters'], 
                 kernel_size=best_params['kernel_size'], 
                 activation='relu'))
model.add(MaxPooling1D(pool_size=best_params['pool_size']))
model.add(Conv1D(filters=best_params['filters2'], 
                 kernel_size=best_params['kernel_size2'], 
                 activation='relu'))
model.add(MaxPooling1D(pool_size=best_params['pool_size2']))
model.add(Flatten())
model.add(Dense(units=best_params['units'], activation='relu'))
model.add(Dropout(rate=best_params['dropout_rate']))
model.add(Dense(1, activation='sigmoid'))

model.compile(optimizer=best_params['optimizer'],
              loss='binary_crossentropy',
              metrics=[f1_score_metric])

# Train the model with the entire training data
history = model.fit(X_train_reshaped, y_train, 
                    epochs=best_params['epochs'], 
                    batch_size=best_params['batch_size'], 
                    validation_split=0.2)

# Save the trained model to a file
model.save(model_path)
print(f"Model saved to {model_path}")

In [None]:
# Reshape the test data for CNN
X_test_reshaped = X_test.reshape((X_test.shape[0], X_test.shape[1], 1))

# Load the saved model
model = tf.keras.models.load_model(model_path, custom_objects={'f1_score_metric': f1_score_metric})
print(f"Model loaded from {model_path}")

# Make predictions on the test data
y_pred_probs = model.predict(X_test_reshaped)
y_pred = (y_pred_probs > 0.5).astype(int)

# Generate the classification report
report = classification_report(y_test, y_pred, target_names=['Class 0', 'Class 1'])
print("Classification Report:\n")
print(report)