In [None]:
import tensorflow as tf
from tensorflow.keras.layers import (
    Input, Dense, BatchNormalization, ReLU, Add,
    Flatten, Reshape, Activation
)
from tensorflow.keras.models import Model
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import StandardScaler, MinMaxScaler
import pandas as pd
import numpy as np

def load_dataset(M, K, T):
    Path_Beta_mk = "D:\OneDrive - Assuit University\My_files\Master Material\Papers\P5_Joint_Powercontrol_PilotAssignement\Beta_mk_46712.csv"
    Path_Eitta_mk = "D:\OneDrive - Assuit University\My_files\Master Material\Papers\P5_Joint_Powercontrol_PilotAssignement\Eitta_mk_46712.csv"
    
    #  Load data with proper headers
    Beta_mk_T = pd.read_csv(Path_Beta_mk, header=None)
    Eitta_mk_T = pd.read_csv(Path_Eitta_mk, header=None)
    
    # Convert to numpy arrays
    Beta_mk = Beta_mk_T.values.reshape(-1, M, K)
    Eitta_mk = Eitta_mk_T.values.reshape(-1, M, K)
    
    # Verify shapes match expected dimensions
    assert Beta_mk.shape == (T, M, K), f"Beta shape mismatch. Got {Beta_mk.shape}, expected ({T}, {M}, {K})"
    print('beta',Beta_mk[0])
    print('eitta',Eitta_mk[0] )
    Beta_mk = Beta_mk[:46700, :, :]
    Eitta_mk = Eitta_mk[:46700, :, :]
    return Beta_mk, Eitta_mk
    print('done 1') 

try:
    Beta_mk, Eitta_mk = load_dataset(25, 6, 46712)
    print("Data loaded successfully!")
    print(f"Beta shape: {Beta_mk.shape}")
    
except Exception as e:
    print(f"Error loading data: {str(e)}")

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import (
    Input, Conv2D,Conv1D, BatchNormalization, ReLU, Add,
    GlobalAveragePooling2D, Dense, Reshape, Activation,Dropout
)
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Multiply, GlobalAveragePooling2D
from tensorflow.keras.regularizers import l2
from tensorflow.keras.layers import Multiply, Concatenate, Conv2D


def se_block(x, ratio=16):
    channels = x.shape[-1]
    se = GlobalAveragePooling2D()(x)
    se = Dense(channels // ratio, activation='relu')(se)
    se = Dense(channels, activation='sigmoid')(se)
    return Multiply()([x, se])

def cbam_block(x, ratio=16):
    # Channel Attention
    channel = GlobalAveragePooling2D()(x)
    channel = Dense(x.shape[-1] // ratio, activation='relu')(channel)
    channel = Dense(x.shape[-1], activation='sigmoid')(channel)
    x = Multiply()([x, channel])
    
    # Spatial Attention
    spatial = Conv2D(1, (7,7), padding='same', activation='sigmoid')(x)
    x = Multiply()([x, spatial])
    return x

def resnet_block(x, filters, kernel_size=(7, 7), name=None):
    """Residual block with proper Sum connection"""
    shortcut = x
    
    # Main path
    x = Conv2D(filters, kernel_size, padding='same', name=f'{name}_conv1')(x)
    x = BatchNormalization(name=f'{name}_bn1')(x)
    x = ReLU(name=f'{name}_relu1')(x)
    
    x = Conv2D(filters, kernel_size, padding='same', name=f'{name}_conv2')(x)
    x = BatchNormalization(name=f'{name}_bn2')(x)
    
    # Shortcut connection
    if shortcut.shape[-1] != filters:
        shortcut = Conv2D(filters, (1, 1), name=f'{name}_shortcut')(shortcut)
        shortcut = BatchNormalization(name=f'{name}_shortcut_bn')(shortcut)
    
    x = Add(name=f'{name}_sum')([x, shortcut])
    x = ReLU(name=f'{name}_out')(x)
    return x

def build_joint_PC_model(input_shape, M, K, P,RESNET_BLOCKS , NO_Filters):  
    inputs = Input(shape=input_shape, name='LSF_Input')  # (25, 1, 6)
    # Conv Block 1
    x = Conv2D(NO_Filters, (7, 7), padding='same', name='conv_block1')(inputs)
    x = BatchNormalization(momentum=0.99, name='bn1')(x)
    x = ReLU(name='relu1')(x)
    
    # ResNet Blocks (R=3)
    for i in range(RESNET_BLOCKS):
        x = resnet_block(x, NO_Filters, name=f'resnet{i+1}')

    #x = se_block(x, ratio=16)  # Add SE block
    x = cbam_block(x, ratio=16)
    # Conv Block 2
    x = Conv2D(NO_Filters, (7, 7), padding='same', name='conv_block2',kernel_regularizer=l2(1e-4))(x)
    x = BatchNormalization(momentum=0.99, name='bn2')(x)
    x = ReLU(name='relu2')(x)
    x = Dropout(0.4)(x)

    # ======== Power Control Branch ========
    pc = Conv2D(K, (7, 7), padding='same', name='power_conv',kernel_regularizer=l2(1e-4))(x)
    pc = Reshape((M, K), name='power_output_1')(pc)
    pc = Activation('linear',name='power_output' )(pc)

    return Model(inputs=inputs, outputs=[ pc])
# Parameters
M, K, P = 25, 6, 2  # APs, Users, Pilots
RESNET_BLOCKS =4
NO_Filters = 512
input_shape = (M, 1, K)  # Height, Width, Channels
EPOCHS = 200
BATCH_SIZE = 256
# ==============================================
# Model Compilation
# ==============================================
PC_model_5 = build_joint_PC_model(input_shape, M, K, P ,RESNET_BLOCKS,NO_Filters)

def hybrid_loss(y_true, y_pred, delta=0.1, alpha=0.5):
    huber = tf.keras.losses.Huber(delta=delta)(y_true, y_pred)
    msle = tf.keras.losses.MeanSquaredLogarithmicError()(y_true, y_pred)
    return alpha * huber + (1 - alpha) * msle

# Update model compilation
PC_model_5.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=3e-4, clipnorm=1.0),
    loss={'power_output': hybrid_loss},
    metrics={'power_output': ['mae']}
)

print("\nModel Output Names:", PC_model_5.output_names)
print("Model Output Shapes:", [output.shape for output in PC_model_5.outputs])

print(PC_model_5.output_names)
print([output.shape for output in PC_model_5.outputs])
PC_model_5.summary()


In [None]:
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import (
    Input, Conv2D, BatchNormalization, ReLU, Add,
    GlobalAveragePooling2D, Dense, Reshape, Activation)
from tensorflow.keras.models import Model
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.preprocessing import QuantileTransformer
from sklearn.preprocessing import PowerTransformer  
import pandas as pd
import numpy as np
import joblib

# Train/validation split
# Split both X and y together
X_train, X_val, ypc_train, ypc_val = train_test_split(
    Beta_mk, 
    Eitta_mk, 
    test_size=0.2, 
    random_state=42,
    shuffle=True
)
#split_idx = int(0.8 * 46700)
#X_train, X_val = Beta_mk[:split_idx], Beta_mk[split_idx:]
#ypc_train, ypc_val = Eitta_mk[:split_idx], Eitta_mk[split_idx:]   

print('xshape' , X_train.shape)
print('yshape',ypc_train.shape)
# Inputs (Beta_mk)
#scaler_X_5 = MinMaxScaler()
scaler_X_5 = QuantileTransformer(n_quantiles=1000, output_distribution='normal')
#X_train= np.where(X_train <= 0, 1e-30, X_train)
#X_train_log = np.log(X_train)
X_train_normalized = scaler_X_5.fit_transform(X_train.reshape(-1, K))
X_train_normalized = X_train_normalized.reshape(-1, M, 1, K)  # (samples, M, 1, K)

#X_val= np.where(X_val <= 0, 1e-30, X_val)
#X_val_log = np.log(X_val)
X_val_normalized = scaler_X_5.transform(X_val.reshape(-1, K))
X_val_normalized = X_val_normalized.reshape(-1, M, 1, K)

# Targets (Eitta_mk)
#scaler_y_5 = MinMaxScaler()
scaler_y_5 = PowerTransformer(method='yeo-johnson')
#ypc_train_scaled = scaler_y_5.fit_transform(ypc_train.reshape(-1, K))
# For training targets
ypc_train_scaled = scaler_y_5.fit_transform(ypc_train.reshape(-1, K))
ypc_train_scaled = ypc_train_scaled.reshape(-1, M, K)  # Remove ", 1"
#ypc_train_scaled= np.where(ypc_train_scaled <= 0, 1e-30, ypc_train_scaled)
#ypc_train_scaled = np.log(ypc_train_scaled)

# For validation targets
ypc_val_scaled = scaler_y_5.transform(ypc_val.reshape(-1, K))
ypc_val_scaled = ypc_val_scaled.reshape(-1, M, K)  
#ypc_val_scaled= np.where(ypc_val_scaled <= 0, 1e-30, ypc_val_scaled)
#ypc_val_scaled = np.log(ypc_val_scaled)

print('beta',X_train[0])
print('betanorm',X_train_normalized[0])
print('eitta',ypc_train[0])
print('eitta_scaled',ypc_train_scaled[0])

# ==============================================
# Training Configuration
# ==============================================
from tensorflow.keras.callbacks import ModelCheckpoint

# Define SWA callback
# Corrected SWA callback
swa_callback = ModelCheckpoint(
    'swa_model.weights.h5',  # Changed extension
    save_best_only=False,
    save_weights_only=True
)

# Define OriginalScaleMAE callback
class OriginalScaleMAE(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        y_pred = self.model.predict(X_val_normalized)
        y_pred_original = scaler_y_5.inverse_transform(y_pred.reshape(-1, K))
        y_true_original = scaler_y_5.inverse_transform(ypc_val_scaled.reshape(-1, K))
        mae = np.mean(np.abs(y_pred_original - y_true_original))
        print(f"Original Scale MAE: {mae:.4f}")

# Callbacks list
callbacks = [
    tf.keras.callbacks.EarlyStopping(monitor='val_mae', patience=5, restore_best_weights=True),
    tf.keras.callbacks.ModelCheckpoint('PC_model_5.keras', save_best_only=True, monitor='val_mae'),  # Full model save
    tf.keras.callbacks.ReduceLROnPlateau(monitor='val_mae', factor=0.2, patience=5, verbose=1),
    tf.keras.callbacks.TerminateOnNaN(),
    ModelCheckpoint('swa_model.weights.h5', save_weights_only=True),  # Weights-only save
    OriginalScaleMAE()
]

print('we will train the model now , be ready')
print(PC_model_5.output_shape)
joblib.dump(scaler_X_5, 'minmax_x_5.save')
joblib.dump(scaler_y_5, 'minmax_y_5.save')
# Assuming:
# - X_train_normalized: Preprocessed training inputs (shape: (samples, 25, 1, 6))
# - ypc_train_scaled: Scaled training targets (shape: (samples, 25, 6))
# - X_val_normalized: Preprocessed validation inputs
# - ypc_val_scaled: Scaled validation targets

history = PC_model_5.fit(
    X_train_normalized,  # Training inputs
    ypc_train_scaled,    # Training targets
    validation_data=(X_val_normalized, ypc_val_scaled),
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    callbacks=callbacks
)
PC_model_5.save(f'PC_model_5.keras')
print('trained ^ ^')


In [None]:
import matplotlib.pyplot as plt

# Plotting configuration
plt.figure(figsize=(18, 12))

# ======================
# Power Control Metrics
# ======================
plt.subplot(2, 2, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Power Control Loss (MSE)')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend()
plt.grid(True)

plt.subplot(2, 2, 2)
plt.plot(history.history['mae'], label='Training MAE')
plt.plot(history.history['val_mae'], label='Validation MAE')
plt.title('Power Control MAE')
plt.ylabel('MAE')
plt.xlabel('Epoch')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.savefig('training_metrics.png', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
import tensorflow as tf
import pandas as pd
import numpy as np
import joblib

# 1. Load Model and Scalers
#PC_model_5 = tf.keras.models.load_model('PC_model_5.keras')
PC_model_5 = tf.keras.models.load_model(
    'PC_model_5.keras',
    custom_objects={'hybrid_loss': hybrid_loss}
)

scaler_X_5 = joblib.load('minmax_x_5.save')  # QuantileTransformer
scaler_y_5 = joblib.load('minmax_y_5.save')  # PowerTransformer

@tf.keras.utils.register_keras_serializable(package="Custom")
def hybrid_loss(y_true, y_pred, delta=0.1, alpha=0.5):
    huber = tf.keras.losses.Huber(delta=delta)(y_true, y_pred)
    msle = tf.keras.losses.MeanSquaredLogarithmicError()(y_true, y_pred)
    return alpha * huber + (1 - alpha) * msle


def batch_predict(raw_betas):  # raw_betas shape: (N, 25, 6)
    # Preprocess inputs
    beta_normalized = scaler_X_5.transform(raw_betas.reshape(-1, K))
    beta_normalized = beta_normalized.reshape(-1, M, 1, K)
    # Predict
    pc_norm = PC_model_5.predict(beta_normalized)
    # Inverse transform targets
    #print('pcnorm',pc_norm)
    ypc_pred_original = scaler_y_5.inverse_transform(pc_norm.reshape(-1, K))
    return ypc_pred_original.reshape(-1, M, K)  # Shape: (N, 25, 6)

# Define M and K
M, K = 25, 6

# Load new beta data
Path_Beta_predict = r"D:\OneDrive - Assuit University\My_files\Master Material\Papers\P5_Joint_Powercontrol_PilotAssignement\Beta_test_148.csv"
Beta_mk_T_Predict = pd.read_csv(Path_Beta_predict, header=None)
new_beta = Beta_mk_T_Predict.values.reshape(-1, M, K)

#Beta_mk_T_Predict = X_train[0]
# Convert to numpy arrays
#new_beta = Beta_mk_T_Predict.reshape(-1, M, K)

# Get predictions
pc_output = batch_predict(new_beta)
#print("Predicted Power Control Coefficients (Original Scale):\n", pc_output)

In [None]:
import scipy.io
import numpy as np

# Wrap the array in a dictionary with a MATLAB variable name (e.g., "pc_coefficients")
data_dict = {'pc_output_5': pc_output}

# Save the dictionary to .mat file
scipy.io.savemat('pc_output_v5.mat', data_dict)