In [None]:
import pandas as pd
import tensorflow as tf
import numpy as np
from imblearn.combine import SMOTEENN
from imblearn.combine import SMOTETomek
from imblearn.over_sampling import SMOTE
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.layers import Add
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, matthews_corrcoef, roc_auc_score
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import OneHotEncoder
from tensorflow.keras.layers import Conv1D, Activation, Multiply

In [None]:
df_raw = pd.read_csv("https://raw.githubusercontent.com/mahidul5130/ChurnNet_Deep_Learning_Enhanced_Customer_Churn-Prediction_in_Telecommunication_Industry/refs/heads/main/Churn-data-UCI%20Dataset(5000).csv")

In [None]:
Encoder = "Label Encoder"
# Encoder = "One-hot Encoder"
# OverSamplingTecnique = ""
# OverSamplingTecnique = "SMOTE-Tomek"
# OverSamplingTecnique = "SMOTE-Enn"

# Let's try to use regular SMOTE instead of SMOTE-Enn
OverSamplingTecnique = "SMOTE"

filter_size=5
number_of_filter=128
flatten_layer_exist=True
Model_Name="SE Block"
# Model_Name="Basic Channel Attention"

**Label Encoding**

In [None]:
if Encoder == "Label Encoder":
  print("Applying Label Encoder")
  df_final = df_raw.copy()
  le = LabelEncoder()

  text_data_features = ['internationalplan', 'voicemailplan']

  print('Label Encoder Transformation')
  for i in text_data_features :
      df_final[i] = le.fit_transform(df_final[i])
      print(i,' : ',df_final[i].unique(),' = ',le.inverse_transform(df_final[i].unique()))



  X = df_final.drop(['churn'], axis=1).copy()
  Y = df_final['churn'].copy().astype(int)

Applying Label Encoder
Label Encoder Transformation
internationalplan  :  [0 1]  =  [False  True]
voicemailplan  :  [1 0]  =  [ True False]


**One-hot Encoding**

In [None]:
if Encoder == "One-hot Encoder":
  print("Applying One-hot Encoder")

  # One-hot encode categorical columns
  categorical_columns = ['internationalplan', 'voicemailplan']

  encoder = OneHotEncoder()
  encoded_features = encoder.fit_transform(df_raw[categorical_columns]).toarray()

  # Combine one-hot encoded features with numerical features
  numerical_features = df_raw.drop(categorical_columns + ['churn'], axis=1)
  X = np.hstack((encoded_features, numerical_features))

  # Manually encode 'Churn' column
  # df_raw['Churn'] = df_raw['Churn'].apply(lambda x: 1 if x == 'Yes' else 0)
  # Extract the target variable Y
  Y = df_raw['churn'].values


  # Ensure all data is in float format
  X = X.astype(float)
  Y = Y.astype(int)

**Squeeze-and-Excitation**

In [None]:
# Define the channel attention layer
class ChannelAttention(tf.keras.layers.Layer):
    def __init__(self, reduction_ratio=8):
        super(ChannelAttention, self).__init__()
        self.reduction_ratio = reduction_ratio

    def build(self, input_shape):
        channels = input_shape[-1]
        self.fc = tf.keras.layers.Dense(channels // self.reduction_ratio, activation='relu')
        self.attention = tf.keras.layers.Dense(channels, activation='sigmoid')

    def call(self, inputs):
        x = tf.reduce_mean(inputs, axis=[1])  # Global average pooling across time dimension
        x = self.fc(x)
        x = self.attention(x)
        x = tf.expand_dims(x, axis=1)  # Add a new dimension for broadcasting
        return inputs * x

# Define the spatial attention layer
class SpatialAttention(tf.keras.layers.Layer):
    def __init__(self):
        super(SpatialAttention, self).__init__()
        self.max_pool = tf.keras.layers.MaxPooling1D(pool_size=3, strides=1, padding='same')
        self.avg_pool = tf.keras.layers.AveragePooling1D(pool_size=3, strides=1, padding='same')
        self.concat = tf.keras.layers.Concatenate(axis=-1)
        self.conv1d = tf.keras.layers.Conv1D(filters=1, kernel_size=3, padding='same', activation='sigmoid')

    def call(self, inputs):
        max_pool_out = self.max_pool(inputs)
        avg_pool_out = self.avg_pool(inputs)
        concat_out = self.concat([max_pool_out, avg_pool_out])
        attention_weights = self.conv1d(concat_out)
        return inputs * attention_weights


# Define the residual block
def residual_block(x, filters, kernel_size):
    # Save the input tensor
    x_shortcut = x

    # First convolutional layer
    x = tf.keras.layers.Conv1D(filters, kernel_size, activation='relu', padding='same')(x)

    # Second convolutional layer
    x = tf.keras.layers.Conv1D(filters, kernel_size, activation='relu', padding='same')(x)

    # Add the shortcut connection
    x = Add()([x, x_shortcut])

    # Apply ReLU activation
    x = tf.keras.layers.Activation('relu')(x)

    return x

**Basic Channel Attention**

In [None]:
# Define the channel attention layer for 1D data
class Basic_ChannelAttention(tf.keras.layers.Layer):
    def __init__(self, ratio=8):
        super(Basic_ChannelAttention, self).__init__()
        self.ratio = ratio

    def build(self, input_shape):
        _, channels = input_shape[1:]
        self.shared_layer1 = Conv1D(channels // self.ratio, kernel_size=1, activation='relu', padding='same')
        self.shared_layer2 = Conv1D(channels, kernel_size=1, padding='same')

    def call(self, inputs):
        x1 = tf.reduce_mean(inputs, axis=1, keepdims=True)
        x1 = self.shared_layer1(x1)
        x1 = self.shared_layer2(x1)

        x2 = tf.reduce_max(inputs, axis=1, keepdims=True)
        x2 = self.shared_layer1(x2)
        x2 = self.shared_layer2(x2)

        attention = tf.add(x1, x2)
        attention = Activation("sigmoid")(attention)
        output = Multiply()([inputs, attention])

        return output

In [None]:
def kfold_fixed(X, Y, filter_size, number_of_filter, flatten_layer_exist, Model_Name, OverSamplingTecnique):
    print("Applying K-fold (Leak-Free)")
    print(f"Applying {number_of_filter} filters of size {filter_size}")

    num_folds = 10

    accuracy_scores = []
    precision_scores = []
    recall_scores = []
    f1_scores = []
    mcc_scores = []
    auc_roc_scores = []

    fold_number = 1
    skf = StratifiedKFold(n_splits=num_folds, shuffle=True, random_state=42)

    for train_index, test_index in skf.split(X, Y):
        print(f"\nFold {fold_number}/{num_folds}")

        # -------------------------
        # 1. Split data
        # -------------------------
        X_train, X_test = X[train_index], X[test_index]
        Y_train, Y_test = Y[train_index], Y[test_index]

        # -------------------------
        # 2. Apply scaler ONLY on training data (Fixes leakage!)
        # -------------------------
        scaler = StandardScaler()
        X_train = scaler.fit_transform(X_train)
        X_test = scaler.transform(X_test)

        # -------------------------
        # 3. Apply SMOTE only to training data
        # -------------------------
        sampler = None
        if OverSamplingTecnique == "SMOTE":
            sampler = SMOTE(random_state=42)
        elif OverSamplingTecnique == "SMOTE-Tomek":
            sampler = SMOTETomek(random_state=42)
        elif OverSamplingTecnique == "SMOTE-Enn":
            sampler = SMOTEENN(random_state=42)

        if sampler is not None:
            print(f"   Resampling training data with {OverSamplingTecnique}...")
            X_train, Y_train = sampler.fit_resample(X_train, Y_train)

        # -------------------------
        # 4. Reshape for Conv1D
        # -------------------------
        X_train = X_train.reshape((X_train.shape[0], X_train.shape[1], 1))
        X_test = X_test.reshape((X_test.shape[0], X_test.shape[1], 1))

        # -------------------------
        # 5. Build the model
        # -------------------------
        inputs = tf.keras.Input(shape=(X_train.shape[1], 1))
        x = tf.keras.layers.Conv1D(filters=number_of_filter, kernel_size=filter_size, activation='relu')(inputs)

        x_res = residual_block(x, number_of_filter, filter_size)

        if Model_Name == "SE Block":
            x = ChannelAttention()(x_res)
        else:
            x = Basic_ChannelAttention()(x_res)

        x = SpatialAttention()(x)
        x = tf.keras.layers.Conv1D(filters=number_of_filter, kernel_size=filter_size, activation='relu', padding='same')(x)
        x_res = residual_block(x, number_of_filter, filter_size)

        if Model_Name == "SE Block":
            x = ChannelAttention()(x_res)
        else:
            x = Basic_ChannelAttention()(x_res)

        x = SpatialAttention()(x)

        # if flatten_layer_exist:
        #     x = tf.keras.layers.Flatten()(x)
        # else:
        #     x = tf.keras.layers.GlobalMaxPooling1D()(x)

        # Only use GlobalAveragePooling1D, much better
        x = tf.keras.layers.GlobalAveragePooling1D()(x)

        x = tf.keras.layers.Dropout(0.5)(x)
        x = tf.keras.layers.Dense(number_of_filter, activation='relu')(x)
        outputs = tf.keras.layers.Dense(1, activation='sigmoid')(x)

        model = tf.keras.Model(inputs=inputs, outputs=outputs)
        model.compile(optimizer='ADAM', loss='binary_crossentropy', metrics=['accuracy'])

        # -------------------------
        # 6. Train the model
        # -------------------------

        # Added early stopping, didnt exist in the original code.
        early_stopping = tf.keras.callbacks.EarlyStopping(
            patience=5,
            restore_best_weights=True,
            monitor="val_loss"
        )

        model.fit(
            X_train,
            Y_train,
            epochs=60, # Increased from 30
            batch_size=32,
            verbose=0,
            validation_split=0.2,
            callbacks=[early_stopping]
        )

        # -------------------------
        # 7. Evaluate on PURE test data
        # -------------------------
        Y_pred = model.predict(X_test, verbose=0)
        Y_pred_binary = np.round(Y_pred).flatten()

        accuracy = accuracy_score(Y_test, Y_pred_binary)
        precision = precision_score(Y_test, Y_pred_binary, zero_division=0)
        recall = recall_score(Y_test, Y_pred_binary, zero_division=0)
        f1 = f1_score(Y_test, Y_pred_binary, zero_division=0)
        mcc = matthews_corrcoef(Y_test, Y_pred_binary)

        try:
            auc_roc = roc_auc_score(Y_test, Y_pred)
        except:
            auc_roc = 0.5

        accuracy_scores.append(accuracy)
        precision_scores.append(precision)
        recall_scores.append(recall)
        f1_scores.append(f1)
        mcc_scores.append(mcc)
        auc_roc_scores.append(auc_roc)

        print(f"   Accuracy: {accuracy:.4f} | F1: {f1:.4f}")

        fold_number += 1

    # -------------------------
    # 8. Print averages
    # -------------------------
    print("\n" + "-" * 30)
    print("Average Test Accuracy:", np.mean(accuracy_scores))
    print("Average Precision:", np.mean(precision_scores))
    print("Average Recall:", np.mean(recall_scores))
    print("Average F1 Score:", np.mean(f1_scores))
    print("Average MCC:", np.mean(mcc_scores))
    print("Average AUC-ROC:", np.mean(auc_roc_scores))

In [None]:
# Ensure X, Y are numpy arrays (no global scaling!)
X_arr = X.values if isinstance(X, pd.DataFrame) else np.asarray(X)
Y_arr = Y.values if isinstance(Y, (pd.Series, np.ndarray)) else np.asarray(Y)

# Call the leak-free k-fold function
kfold_fixed(
    X_arr,
    Y_arr,
    filter_size=filter_size,
    number_of_filter=number_of_filter,
    flatten_layer_exist=flatten_layer_exist,
    Model_Name=Model_Name,
    OverSamplingTecnique=OverSamplingTecnique
)

Applying K-fold (Leak-Free)
Applying 128 filters of size 5

Fold 1/10
   Resampling training data with SMOTE...
   Accuracy: 0.8740 | F1: 0.5828

Fold 2/10
   Resampling training data with SMOTE...
   Accuracy: 0.8240 | F1: 0.5368

Fold 3/10
   Resampling training data with SMOTE...
   Accuracy: 0.8620 | F1: 0.6425

Fold 4/10
   Resampling training data with SMOTE...
   Accuracy: 0.8980 | F1: 0.6667

Fold 5/10
   Resampling training data with SMOTE...
   Accuracy: 0.8940 | F1: 0.6826

Fold 6/10
   Resampling training data with SMOTE...
   Accuracy: 0.8660 | F1: 0.6171

Fold 7/10
   Resampling training data with SMOTE...
   Accuracy: 0.8920 | F1: 0.6786

Fold 8/10
   Resampling training data with SMOTE...
   Accuracy: 0.8920 | F1: 0.6197

Fold 9/10
   Resampling training data with SMOTE...
   Accuracy: 0.8700 | F1: 0.6199

Fold 10/10
   Resampling training data with SMOTE...
   Accuracy: 0.9000 | F1: 0.6667

------------------------------
Average Test Accuracy: 0.8772
Average Precision: