In [None]:
# Numerical computation libraries
import numpy as np               # For numerical operations and arrays
import pandas as pd              # For data handling and analysis
import random                    # For random action selection - Epsilon-Greedy Algorithm in Reinforcement Learning


# Visualization libraries
import matplotlib.pyplot as plt  # Plotting graphs
import seaborn as sns            # Statistical visualizations

# Ignore non-critical warnings
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

# Machine Learning utilities
from sklearn.model_selection import train_test_split  # split the dataset into subsets
from sklearn.preprocessing import StandardScaler      # Feature scaling - scales parameter
from sklearn.metrics import (classification_report, confusion_matrix, accuracy_score)
                                                            #|              #|
                #checks the predicted value with the original dataset     #cheks accuracy for rewards(RL)
from sklearn.ensemble import RandomForestClassifier   # Baseline ML model - combines the predictions of
#multiple individual models—in this case, decision trees—to achieve higher accuracy and stability
#Model	                Why Not Used
#Logistic Regression	Assumes linear decision boundary
#Linear SVM	            Cannot model interactions
#Kernel SVM	            High computation, poor scalability
#KNN	                Sensitive to noise and scaling
#Naive Bayes	        Assumes feature independence
#Single Decision Tree	Overfits easily
#XGBoost	            Powerful but more complex to tune

# Reinforcement Learning (Gym) - Reinforcement learning is used because handover optimization is a
#sequential control problem where each decision affects future network performance and QoE.
#Supervised models only predict outcomes, whereas RL learns optimal actions through interaction with
#the environment
import gymnasium as gym
from gymnasium import spaces   # observation space (network state) eg. position, location
                               # action space (handover decisions) eg. direction

# Deep Learning (PyTorch)
import torch                  # implements the Deep Q-Network
import torch.nn as nn         # defines neural layers.
import torch.optim as optim   # update the weights and biases of your neural network


In [None]:
# Load the 5G handover dataset from Kaggle input directory
df = pd.read_csv(
    "/kaggle/input/5g-handover-optimization-dataset-csv/5G_Handover_Optimization_Dataset.csv"
)

# Display first few rows to verify data loading
df.head()


In [None]:
# Display dataset structure and data types
df.info()  # Shows data types and non-null counts

# Show statistical summary of numerical features
df.describe()

# Check for missing values in each column
df.isnull().sum()


In [None]:
#correlation heatmap
plt.figure(figsize=(30,20))
sns.heatmap(df.corr(), annot=True, cmap="coolwarm")
plt.title("Correlation Heatmap")
plt.show()
#correlation heatmap used to understand the relationship between different network parameters and
#call drop behavior.

#Correlation means how strongly two parameters are related. Red color shows strong relation, blue shows
#weak relation.

#The values range from -1 to +1. Values close to 0 indicate very weak or no relationship.

#From the heatmap, we can see that most parameters have correlation values close to zero. This means
#no single network parameter alone strongly affects call drop.

#Because the relationships are weak, simple rule-based methods are not effective. So we use machine
#learning to analyze multiple parameters together to predict call drops.

In [None]:
# Function to convert raw columns into a single outcome label
def derive_outcome(row):
    # Successful handover without ping-pong or drop
    if row['HO_Success'] == 1 and row['PingPong_Event'] == 0 and row['Session_Drop'] == 0:
        return 2   # Successful handover

    # Ping-pong handover detected
    elif row['PingPong_Event'] == 1:
        return 1   # Ping-pong handover

    # All other cases are treated as failures
    else:
        return 0   # Failure / session drop

# Apply function row-wise to create new target column
df['outcome_encoded'] = df.apply(derive_outcome, axis=1)

# Display class distribution
df['outcome_encoded'].value_counts()


In [None]:
# List of input features used for ML and RL
features = [
    'Serving_RSRP_dBm',            # Signal strength of serving cell
    'Neighbor_RSRP_dBm',           # Signal strength of neighbor cell
    'Serving_SINR_dB',             # SINR of serving cell
    'Neighbor_SINR_dB',            # SINR of neighbor cell
    'Serving_Cell_Load_pct',       # Load on serving cell
    'Neighbor_Cell_Load_pct',      # Load on neighbor cell
    'UE_Speed_kmph',               # User speed
    'Handover_Count_Last_30s',     # Recent handover history
    'PingPong_Last_60s',           # Recent ping-pong history
    'Latency_ms'                   # Session latency
]

# Feature matrix
X = df[features]

# Target variable
y = df['outcome_encoded']


In [None]:
# Split dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(
    X, y,
    test_size=0.2,
    random_state=42,
    stratify=y
)

# Initialize standard scaler (zero mean, unit variance)
scaler = StandardScaler()

# Fit scaler ONLY on training data
scaler.fit(X_train)

# Store feature names to preserve column alignment
feature_names = X_train.columns.tolist()

# Transform both train and test sets
X_train_scaled = scaler.transform(X_train)
X_test_scaled = scaler.transform(X_test)


In [None]:
# Initialize Random Forest classifier
rf = RandomForestClassifier(
    n_estimators=200,     # Number of trees
    max_depth=10,         # Limit depth to prevent overfitting
    random_state=42
)

# Train the model
rf.fit(X_train_scaled, y_train)

# Predict on test set
y_pred = rf.predict(X_test_scaled)

# Print accuracy score
print("Accuracy:", accuracy_score(y_test, y_pred))

# Print precision, recall, F1-score
print(classification_report(y_test, y_pred))


In [None]:
# Generate confusion matrix
cm = confusion_matrix(y_test, y_pred)

# Plot confusion matrix
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Baseline ML – Handover Outcome")
plt.show()
print("Accuracy:", accuracy_score(y_test, y_pred))

In [None]:
class HandoverEnv(gym.Env):
    """
    Custom Gym environment for 5G handover optimization
    """

    def __init__(self, data, scaler):
        super().__init__()

        # Store dataset and scaler
        self.data = data.reset_index(drop=True)
        self.scaler = scaler

        # Pointer to current timestep
        self.current_step = 0

        # Observation space: normalized feature vector
        self.observation_space = spaces.Box(
            low=-5,
            high=5,
            shape=(len(features),),
            dtype=np.float32
        )

        # Action space
        # 0: Do nothing
        # 1: Tune handover parameters
        # 2: Trigger handover
        self.action_space = spaces.Discrete(3)

    def reset(self, seed=None, options=None):
        # Reset environment at beginning of episode
        super().reset(seed=seed)
        self.current_step = 0
        return self._get_state(), {}

    def _get_state(self):
        # Extract row as DataFrame to preserve feature names
        raw = self.data.loc[self.current_step, features].to_frame().T

        # Scale features
        scaled = self.scaler.transform(raw)

        # Return as float32 numpy array
        return scaled[0].astype(np.float32)

    def step(self, action):
        # Get ground-truth outcome
        outcome = self.data.loc[self.current_step, 'outcome_encoded']

        # Reward logic based on agent action
        if action == 2:      # Trigger handover
            reward = 10 if outcome == 2 else -5 if outcome == 1 else -10
        elif action == 1:    # Tune parameters
            reward = 2 if outcome != 0 else -2
        else:                # Do nothing
            reward = 1 if outcome == 2 else -3

        # Move to next timestep
        self.current_step += 1

        # Episode termination condition
        terminated = self.current_step >= len(self.data) - 1

        # Next state
        next_state = (
            self._get_state()
            if not terminated
            else np.zeros(len(features), dtype=np.float32)
        )

        return next_state, reward, terminated, False, {}


In [None]:
class DQN(nn.Module):
    """
    Deep Q-Network for handover decision-making
    """
    def __init__(self, state_size, action_size):
        super().__init__()

        # Fully connected neural network
        self.net = nn.Sequential(
            nn.Linear(state_size, 64),  # Input layer
            nn.ReLU(),
            nn.Linear(64, 64),          # Hidden layer
            nn.ReLU(),
            nn.Linear(64, action_size) # Output Q-values
        )

    def forward(self, x):
        return self.net(x)


In [None]:
# Create environment
env = HandoverEnv(df, scaler)

# State and action dimensions
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

# Initialize DQN
model = DQN(state_size, action_size)

# Optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Loss function
criterion = nn.MSELoss()

# RL hyperparameters
gamma = 0.95
epsilon = 1.0
epsilon_decay = 0.995
epsilon_min = 0.01

episodes = 20
MAX_STEPS = 300


In [None]:
reward_history = []

for episode in range(episodes):
    # Reset environment
    state, _ = env.reset()
    total_reward = 0

    for step in range(MAX_STEPS):
        state_tensor = torch.tensor(state).unsqueeze(0)

        # ε-greedy action selection
        if random.random() < epsilon:
            action = env.action_space.sample()
        else:
            with torch.no_grad():
                action = model(state_tensor).argmax().item()

        # Take action
        next_state, reward, done, _, _ = env.step(action)
        total_reward += reward

        # Compute target Q-value
        next_state_tensor = torch.tensor(next_state).unsqueeze(0)
        with torch.no_grad():
            target_q = reward + gamma * model(next_state_tensor).max().item() * (not done)

        # Current Q-value
        current_q = model(state_tensor)[0, action]

        # Compute loss
        loss = criterion(current_q, torch.tensor(target_q))

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        state = next_state
        if done:
            break

    # Decay exploration rate
    epsilon = max(epsilon_min, epsilon * epsilon_decay)
    reward_history.append(total_reward)

    print(
        f"Episode {episode+1}/{episodes} | "
        f"Reward: {total_reward:.1f} | "
        f"Epsilon: {epsilon:.3f}"
    )


In [None]:
plt.figure(figsize=(7,4))
plt.plot(reward_history, marker='o')
plt.xlabel("Episode")
plt.ylabel("Total Reward")
plt.title("DQN Training Reward Progress")
plt.grid(True)
plt.show()


In [None]:
# Human-readable labels
outcome_labels = {
    0: 'Failure / Session Drop',
    1: 'Ping-Pong',
    2: 'Successful Handover'
}

# Map encoded outcomes to labels
df['outcome_name'] = df['outcome_encoded'].map(outcome_labels)

# Plot distribution
df['outcome_name'].value_counts().plot(kind='bar')
plt.title("Handover Outcome Distribution")
plt.xlabel("Outcome Type")
plt.ylabel("Count")
plt.grid(axis='y', linestyle='--', alpha=0.6)
plt.show()
