# CNN XGB

Author: Tiankai Yan <br>
Date: 11/07/2024

In [1]:
# system
import os
import pickle
import gzip

# data manipulation
import pandas as pd
import numpy as np

# deep learning
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras import layers, models
import matplotlib.pyplot as plt
from tensorflow.keras import metrics


import xgboost as xgb
from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
import numpy as np




## Import Dataset

In [None]:
with gzip.open(file_path, 'rb') as f:
    data = pickle.load(f)

## Train-test Split

In [3]:
X = np.stack(data['upstream_region_encoded'].values)
Y = data['DE'].values
X_train, X_dev, Y_train, Y_dev = train_test_split(X, Y, test_size=0.2, shuffle=True, random_state=123)
X_train_flat = X_train.reshape(X_train.shape[0], -1)
X_dev_flat = X_dev.reshape(X_dev.shape[0], -1)

CNN PART

In [13]:
X_train = np.expand_dims(X_train, axis=-1)
X_dev = np.expand_dims(X_dev, axis=-1) 

Hyperparameters:

In [15]:
BUFFER_SIZE = 50000
BATCH_SIZE = 256
EPOCH_SIZE = 10

Shuffle the dataset and apply mini-batch gradient descent with a batch size of 256:

In [16]:
train = tf.data.Dataset.from_tensor_slices((X_train, Y_train)).shuffle(BUFFER_SIZE).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
dev = tf.data.Dataset.from_tensor_slices((X_dev, Y_dev)).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

In [17]:
for batch in train.take(1):  
    X_batch, Y_batch = batch
    print("X_batch:", X_batch.shape) 
    print("Y_batch:", Y_batch.shape)

X_batch: (256, 2000, 4, 1)
Y_batch: (256,)


In [18]:
num_batches = 0
for _ in train:
    num_batches += 1
print("Number of mini-batches:", num_batches)

Number of mini-batches: 173


In [None]:
model = models.Sequential()
model.add(layers.Input(shape=(2000, 4, 1), batch_size=BATCH_SIZE))
model.add(layers.Conv2D(32, (2, 2), activation='relu', padding='same'))
model.add(layers.BatchNormalization())
model.add(layers.MaxPooling2D((2, 2), padding='same'))

model.add(layers.Conv2D(64, (2, 2), activation='relu', padding='same'))
model.add(layers.BatchNormalization())
model.add(layers.MaxPooling2D((2, 2), padding='same'))
model.add(layers.Dropout(0.5))

model.add(layers.Conv2D(128, (2, 2), activation='relu', padding='same'))
model.add(layers.BatchNormalization())
model.add(layers.MaxPooling2D((2, 2), padding='same'))
model.add(layers.Dropout(0.5))

model.add(layers.GlobalMaxPooling2D())
model.add(layers.Dense(128, activation='relu'))
model.add(layers.Dropout(0.5))
model.add(layers.Dense(1, activation='sigmoid'))





In [43]:
import tensorflow_addons as tfa
from tensorflow.keras.optimizers import Adam

model.compile(
    optimizer=Adam(learning_rate=0.0001),
    loss=tfa.losses.SigmoidFocalCrossEntropy(),  # Focal Loss from TensorFlow Addons
    metrics=[
        'accuracy',
        tf.keras.metrics.AUC(name='auc'),
        tf.keras.metrics.Precision(name='precision'),
        tf.keras.metrics.Recall(name='recall')
    ]
)

In [None]:
history = model.fit(
    train,                  
    epochs=EPOCH_SIZE,        
    validation_data=dev     
)

CNN+XGB

In [None]:
import tensorflow_addons as tfa
cnn_model = models.Sequential([
    layers.Input(shape=(2000, 4, 1)),
    layers.Conv2D(32, (3, 3), activation='relu', padding='same'),
    layers.BatchNormalization(),
    layers.MaxPooling2D((2, 2), padding='same'),
    layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
    layers.BatchNormalization(),
    layers.MaxPooling2D((2, 2), padding='same'),
    layers.Conv2D(128, (3, 3), activation='relu', padding='same'),
    layers.BatchNormalization(),
    layers.MaxPooling2D((2, 1), padding='same'),  # attention: collapsing width!
    layers.Conv2D(256, (3, 3), activation='relu', padding='same'),
    layers.BatchNormalization(),
    layers.MaxPooling2D((2, 1), padding='same'),
    layers.Conv2D(512, (3, 3), activation='relu', padding='same'),
    layers.BatchNormalization(),
    layers.GlobalMaxPooling2D(),
    layers.Dense(512, activation='relu'),
    layers.Dropout(0.3)
])

cnn_model.compile(optimizer=tfa.optimizers.AdamW(learning_rate=1e-4, weight_decay=1e-5)
                  , loss=tfa.losses.SigmoidFocalCrossEntropy(), metrics=['accuracy'])
cnn_model.fit(X_train, Y_train, epochs=3, batch_size=64, validation_data=(X_dev, Y_dev))


In [None]:
feature_extractor = models.Model(inputs=cnn_model.input, outputs=cnn_model.layers[-2].output)
X_train_features = feature_extractor.predict(X_train)
X_dev_features = feature_extractor.predict(X_dev)



In [None]:
from sklearn.linear_model import LogisticRegression

classifier = LogisticRegression(max_iter=100)
classifier.fit(X_train_features, Y_train)

Y_pred = classifier.predict(X_dev_features)
Y_pred_proba = classifier.predict_proba(X_dev_features)[:, 1] 

accuracy = accuracy_score(Y_dev, Y_pred)
precision = precision_score(Y_dev, Y_pred)
recall = recall_score(Y_dev, Y_pred)
f1 = f1_score(Y_dev, Y_pred)
auc = roc_auc_score(Y_dev, Y_pred_proba)

print("\nEvaluation Metrics (Logistic Regression on CNN Features):")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"AUC: {auc:.4f}")

Hyperparameter Tuning

In [None]:
initial_num_boost_round = 100
dtrain = xgb.DMatrix(X_train_flat, label=Y_train)
ddev = xgb.DMatrix(X_dev_flat, label=Y_dev)

params = {
    'objective': 'binary:logistic',
    'learning_rate': 0.05,
    'scale_pos_weight': sum(Y_train == 0) / sum(Y_train == 1) * 2,
    'eval_metric': 'aucpr'
}

# The scale pos weight is also set dynamicaly based on the dataset

max_depth_range = [3, 5, 7, 9]
min_child_weight_range = [1, 2, 3, 5]
best_max_depth, best_min_child_weight = None, None
best_cv_score = float('-inf')

for max_depth in max_depth_range:
    for min_child_weight in min_child_weight_range:
        params['max_depth'] = max_depth
        params['min_child_weight'] = min_child_weight

        cv_results = xgb.cv(
            params=params,
            dtrain=dtrain,
            num_boost_round=initial_num_boost_round,
            nfold=3,
            metrics='aucpr',
            early_stopping_rounds=10,
            seed=42
        )

        mean_aucpr = cv_results['test-aucpr-mean'].max()
        if mean_aucpr > best_cv_score:
            best_cv_score = mean_aucpr
            best_max_depth = max_depth
            best_min_child_weight = min_child_weight

params['max_depth'] = best_max_depth
params['min_child_weight'] = best_min_child_weight

gamma_range = [0, 0.1, 0.2, 0.3, 0.4]
best_gamma = None
best_cv_score = float('-inf')

for gamma in gamma_range:
    params['gamma'] = gamma

    cv_results = xgb.cv(
        params=params,
        dtrain=dtrain,
        num_boost_round=initial_num_boost_round,
        nfold=3,
        metrics='aucpr',
        early_stopping_rounds=10,
        seed=42
    )

    mean_aucpr = cv_results['test-aucpr-mean'].max()
    if mean_aucpr > best_cv_score:
        best_cv_score = mean_aucpr
        best_gamma = gamma

params['gamma'] = best_gamma

subsample_range = [0.6, 0.8, 1.0]
colsample_bytree_range = [0.6, 0.8, 1.0]
best_subsample, best_colsample_bytree = None, None
best_cv_score = float('-inf')

for subsample in subsample_range:
    for colsample_bytree in colsample_bytree_range:
        params['subsample'] = subsample
        params['colsample_bytree'] = colsample_bytree

        cv_results = xgb.cv(
            params=params,
            dtrain=dtrain,
            num_boost_round=initial_num_boost_round,
            nfold=3,
            metrics='aucpr',
            early_stopping_rounds=10,
            seed=42
        )

        mean_aucpr = cv_results['test-aucpr-mean'].max()
        if mean_aucpr > best_cv_score:
            best_cv_score = mean_aucpr
            best_subsample = subsample
            best_colsample_bytree = colsample_bytree

params['subsample'] = best_subsample
params['colsample_bytree'] = best_colsample_bytree

reg_alpha_range = [0, 0.1, 0.5, 0.8, 1.0]
reg_lambda_range = [0.5, 1.0, 1.5, 2.0]
best_reg_alpha, best_reg_lambda = None, None
best_cv_score = float('-inf')

for reg_alpha in reg_alpha_range:
    for reg_lambda in reg_lambda_range:
        params['reg_alpha'] = reg_alpha
        params['reg_lambda'] = reg_lambda

        cv_results = xgb.cv(
            params=params,
            dtrain=dtrain,
            num_boost_round=initial_num_boost_round,
            nfold=3,
            metrics='aucpr',
            early_stopping_rounds=10,
            seed=42
        )

        mean_aucpr = cv_results['test-aucpr-mean'].max()
        if mean_aucpr > best_cv_score:
            best_cv_score = mean_aucpr
            best_reg_alpha = reg_alpha
            best_reg_lambda = reg_lambda

params['reg_alpha'] = best_reg_alpha
params['reg_lambda'] = best_reg_lambda

learning_rate_range = [0.001, 0.01, 0.1]
num_boost_round_range = [50, 100, 250]
best_learning_rate, best_num_boost_round = None, None
best_cv_score = float('-inf')

for learning_rate in learning_rate_range:
    for num_boost_round in num_boost_round_range:
        params['learning_rate'] = learning_rate

        cv_results = xgb.cv(
            params=params,
            dtrain=dtrain,
            num_boost_round=num_boost_round,
            nfold=3,
            metrics='aucpr',
            early_stopping_rounds=10,
            seed=42
        )

        mean_aucpr = cv_results['test-aucpr-mean'].max()
        if mean_aucpr > best_cv_score:
            best_cv_score = mean_aucpr
            best_learning_rate = learning_rate
            best_num_boost_round = num_boost_round

params['learning_rate'] = best_learning_rate

In [None]:
params = {
    'objective': 'binary:logistic',
    'learning_rate': 0.01,
    'max_depth': 7,
    'min_child_weight': 2,
    'gamma': 0.16666666666666666,
    'subsample': 0.8,
    'colsample_bytree': 0.6,
    'reg_alpha': 0.5,
    'reg_lambda': 1.0,
    'scale_pos_weight': sum(Y_train == 0) / sum(Y_train == 1)*2,  # This will still be computed based on your data
    'eval_metric': 'aucpr'
}

# The hyperparameters tuning results are also related to the dataset and down sampling results.

In [None]:
evals = [(ddev, 'eval')]
best_xgb_model = xgb.train(
    params=params,
    dtrain=dtrain,
    num_boost_round=100,
    evals=evals,
    early_stopping_rounds=10,
    verbose_eval=True
)

Y_pred_proba = best_xgb_model.predict(ddev)
Y_pred = (Y_pred_proba > 0.6).astype(int)

In [None]:
# Calculate Evaluation Metrics
accuracy = accuracy_score(Y_dev, Y_pred)
precision = precision_score(Y_dev, Y_pred)
recall = recall_score(Y_dev, Y_pred)
f1 = f1_score(Y_dev, Y_pred)
auc = roc_auc_score(Y_dev, Y_pred_proba)
print("\nEvaluation Metrics:")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"AUC: {auc:.4f}")

"""
Evaluation Metrics:
Accuracy: 0.6786
Precision: 0.6809
Recall: 0.7273
F1 Score: 0.7033
AUC: 0.7955
"""



Evaluation Metrics:
Accuracy: 0.6786
Precision: 0.6809
Recall: 0.7273
F1 Score: 0.7033
AUC: 0.7955


GAN (also covered in other codes)

In [None]:
def make_generator_model():
    model = models.Sequential()
    model.add(layers.Input(shape=(2000, 4, 1)))  # Input shape should match the real data shape
    
    model.add(layers.Conv2DTranspose(128, (2, 2), strides=(1, 1), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.ReLU())
    
    model.add(layers.Conv2DTranspose(64, (2, 2), strides=(1, 1), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.ReLU())
    
    model.add(layers.Conv2DTranspose(1, (2, 2), strides=(1, 1), padding='same', use_bias=False, activation='tanh'))
    
    return model

def make_discriminator_model():
    model = models.Sequential()
    model.add(layers.Input(shape=(2000, 4, 1)))
    
    model.add(layers.Conv2D(64, (2, 2), strides=(2, 2), padding='same'))
    model.add(layers.LeakyReLU(alpha=0.2))
    model.add(layers.Dropout(0.3))
    
    model.add(layers.Conv2D(128, (2, 2), strides=(2, 2), padding='same'))
    model.add(layers.LeakyReLU(alpha=0.2)) 
    model.add(layers.Dropout(0.3))
    
    model.add(layers.GlobalMaxPooling2D())
    model.add(layers.Dense(1))  # Outputting a single score (real or fake)
    
    return model

# Create the models
generator = make_generator_model()
discriminator = make_discriminator_model()

discriminator_optimizer = tf.keras.optimizers.Adam(1e-4)
generator_optimizer = tf.keras.optimizers.Adam(1e-4)

cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)

def discriminator_loss(real_output, fake_output):
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    return total_loss

def generator_loss(fake_output):
    return cross_entropy(tf.ones_like(fake_output), fake_output)  # Non-saturating loss

In [None]:
# Training step

train_dataset = tf.data.Dataset.from_tensor_slices((X_train, Y_train)).shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

@tf.function
def train_step(real_data, real_labels):
    noise = tf.random.normal([BATCH_SIZE, 2000, 4, 1])
    
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        generated_data = generator(noise, training=True)
        generated_labels = tf.zeros([BATCH_SIZE], dtype=tf.int64)  # Labels for generated data
        
        real_output = discriminator(real_data, training=True)
        fake_output = discriminator(generated_data, training=True)
        
        gen_loss = generator_loss(fake_output)
        disc_loss = discriminator_loss(real_output, fake_output)
    
    gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)
    
    generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))

def train(dataset, epochs):
    for epoch in range(epochs):
        for data_batch, label_batch in dataset:
            train_step(data_batch, label_batch)
        print(f'Epoch {epoch + 1}/{epochs} completed')

train(train_dataset, 100) # 100 epochs

noise = tf.random.normal([1, 100])
generated_data = generator(noise, training=False)

print('Generated Data:', np.squeeze(generated_data))
