In [9]:
import numpy as np
import matplotlib.pyplot as plt
import cv2
from sklearn.model_selection import KFold
from tensorflow.keras import layers, models

In [10]:
spacing = {
    "single_ch":    [14.3, 15.3, 16.3, 17.8, 19.3, 21.3, 23.3, 26.3, 28.3, 31.3, 36.3],
    "18GHz":        [18, 19, 20, 23, 25, 27, 30, 32, 35, 40],
    "17.6GHz":      [18, 19, 20, 21.5, 23, 25, 27, 30, 32, 35, 40], # Uncomment if you have data for 17.6GHz
    "17GHz":        [18, 19, 20, 21.5, 23, 25, 27, 30, 32, 35, 40],
    "16.5GHz":      [18, 19, 20, 21.5, 23, 25, 27, 30, 32, 35, 40],
    "16GHz":        [18, 19, 20, 21.5, 23, 25, 27, 30, 32, 35, 40],
    "15.5GHz":      [20, 21.5, 23, 25, 27, 30, 32, 35, 40],
    "15GHz":        [23, 25, 27, 30, 32, 35, 40],
}

In [11]:
def get_osnr_value(spacing, value):
    """ Return the OSNR value for the given spacing and value

    Args:
        spacing (str): The spacing of the data (single_ch, 18GHz, 17GHz, 16.5GHz, 16GHz, 15.5GHz, 15GHz)
        value (float): The value of the OSNR

    Returns:
        float: The OSNR value
    """
    if spacing == "single_ch":
        if value == 14.3:
            return 18
        elif value == 15.3:
            return 19
        elif value == 16.3:
            return 20
        elif value == 17.8:
            return 21.5
        elif value == 19.3:
            return 23
        elif value == 21.3:
            return 25
        elif value == 23.3:
            return 27
        elif value == 26.3:
            return 30
        elif value == 28.3:
            return 32
        elif value == 31.3:
            return 35
        elif value == 36.3:
            return 40
    else:
        return value

In [12]:
images = []
labels = []
osnr = []

for i in spacing.keys():
    for j in spacing[i]:
        for k in range(9):
            dir = f"constellations/{i}/{j}_dB_sample_{k}.png"
            
            # Add OSNR value
            osnr.append(get_osnr_value(i, j))
            
            # Load the image in RGB format
            img = cv2.imread(dir)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            img = cv2.GaussianBlur(img, (5, 5), 0)
            img = img / 255.0
            images.append(img)

            # Find the labels for the images
            if i == "single_ch":
                labels.append(0)
            elif i == "18GHz":
                labels.append(1)
            elif i == "17.6GHz":
                labels.append(2)
            elif i == "17GHz":
                labels.append(3)
            elif i == "16.5GHz":
                labels.append(4)
            elif i == "16GHz":
                labels.append(5)
            elif i == "15.5GHz":
                labels.append(6)
            elif i == "15GHz":
                labels.append(7)
            else:
                labels.append(-1)

images = np.array(images).astype(np.float32)
osnr = np.array(osnr).astype(np.float32)
labels = np.array(labels)

In [13]:
n_classes = len(np.unique(labels))
input_shape = images[0].shape

print("Num. classes: ", n_classes)
print("Input shape: ", input_shape)

Num. classes:  8
Input shape:  (64, 64, 3)


In [14]:
def create_cnn_model(mode="classification", n_classes=2, osnr_on=False):
    """ Create a CNN model for classification or regression

    Args:
        mode (str, optional): Mode of the model (classification or regression). Defaults to "classification".
        n_classes (int, optional): Number of classes. Defaults to 2.
        osnr_on (bool, optional): If True, the model will take the OSNR value as input. Defaults to False.

    Returns:
        models.Model: The CNN model
    """
    input = layers.Input(shape=input_shape)
    osnr = layers.Input(shape=(1,))
    
    x = layers.Conv2D(16, (3, 3), activation='relu')(input)
    x = layers.MaxPooling2D((2, 2))(x)
    x = layers.Conv2D(8, (3, 3), activation='relu')(x)
    x = layers.MaxPooling2D((2, 2))(x)
    
    x = layers.Flatten()(x)
    
    if osnr_on:
        x = layers.Concatenate()([x, osnr])
    
    if mode == "classification":
        x = layers.Dense(n_classes, activation='softmax')(x)
        if osnr_on:
            model = models.Model(inputs=[input, osnr], outputs=x)
        else:
            model = models.Model(inputs=input, outputs=x)
        model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    elif mode == "regression":
        x = layers.Dense(1, activation='linear')(x)
        if osnr_on:
            model = models.Model(inputs=[input, osnr], outputs=x)
        else:
            model = models.Model(inputs=input, outputs=x)
        model.compile(optimizer='adam', loss='mean_absolute_error', metrics=['mean_absolute_error', 'mean_squared_error'])
        
    return model

In [15]:
kf = KFold(n_splits=10, shuffle=True)
arr1 = []
arr2 = []
arr3 = []

for train_index, test_index in kf.split(images):
    X_train, X_test = images[train_index], images[test_index]
    y_train, y_test = labels[train_index], labels[test_index]
    osnr_train, osnr_test = osnr[train_index], osnr[test_index]

    model = create_cnn_model(mode="regression", osnr_on=True)
    model.fit([X_train, osnr_train], y_train, epochs=300, batch_size=32, validation_data=([X_test, osnr_test], y_test))
    a, b, c = model.evaluate([X_test, osnr_test], y_test)
    arr1.append(a)
    arr2.append(b)
    arr3.append(np.sqrt(c))

Epoch 1/300
Epoch 2/300
Epoch 3/300
Epoch 4/300
Epoch 5/300
Epoch 6/300
Epoch 7/300
Epoch 8/300
Epoch 9/300
Epoch 10/300
Epoch 11/300
Epoch 12/300
Epoch 13/300
Epoch 14/300
Epoch 15/300
Epoch 16/300
Epoch 17/300
Epoch 18/300
Epoch 19/300
Epoch 20/300
Epoch 21/300
Epoch 22/300
Epoch 23/300
Epoch 24/300
Epoch 25/300
Epoch 26/300
Epoch 27/300
Epoch 28/300
Epoch 29/300
Epoch 30/300
Epoch 31/300
Epoch 32/300
Epoch 33/300
Epoch 34/300
Epoch 35/300
Epoch 36/300
Epoch 37/300
Epoch 38/300
Epoch 39/300
Epoch 40/300
Epoch 41/300
Epoch 42/300
Epoch 43/300
Epoch 44/300
Epoch 45/300
Epoch 46/300
Epoch 47/300
Epoch 48/300
Epoch 49/300
Epoch 50/300
Epoch 51/300
Epoch 52/300
Epoch 53/300
Epoch 54/300
Epoch 55/300
Epoch 56/300
Epoch 57/300
Epoch 58/300
Epoch 59/300
Epoch 60/300
Epoch 61/300
Epoch 62/300
Epoch 63/300
Epoch 64/300
Epoch 65/300
Epoch 66/300
Epoch 67/300
Epoch 68/300
Epoch 69/300
Epoch 70/300
Epoch 71/300
Epoch 72/300
Epoch 73/300
Epoch 74/300
Epoch 75/300
Epoch 76/300
Epoch 77/300
Epoch 78

In [16]:
best_rmse_index = np.argmin(arr3)
best_loss = arr1[best_rmse_index]
best_mae = arr2[best_rmse_index]
best_rmse = arr3[best_rmse_index]

print(f"Best RMSE: {best_rmse}")
print(f"Best MAE: {best_mae}")

Best RMSE: 0.9754121948350432
Best MAE: 0.7439729571342468
