In [1]:
import numpy as np
import h5py
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelBinarizer
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, BatchNormalization, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.layers import Conv1D, BatchNormalization, MaxPooling1D, Flatten, Dense, Dropout
import tensorflow as tf
tf.config.run_functions_eagerly(True)

In [2]:
def data_generator(Xs, lbl, batch_size=32):
    num_samples = Xs.shape[0]

    while True:
        indices = np.random.permutation(num_samples)
        for i in range(0, num_samples, batch_size):
            batch_indices = indices[i:i+batch_size]
            X_batch = Xs[batch_indices]
            y_batch = to_categorical(lbl[batch_indices], num_classes=num_classes)
            yield X_batch, y_batch

In [4]:
# Load the dataset using h5py
file_name = "amc_800.mat"
with h5py.File(file_name, 'r') as Xd:
    X = np.array(Xd['rcd_y'])
    lbl = np.array(Xd['lbl'])[0]  # Taking only far user
    snrs = np.array(Xd['snrs'])

# Convert complex numbers to separate real and imaginary parts
cplx = X['real'] + X['imag']*1j
c_t = cplx.transpose()
Xs = np.zeros((c_t.shape[0], 2, c_t.shape[1]))
Xs[:, 0, :] = c_t.real
Xs[:, 1, :] = c_t.imag
snrs = np.transpose(snrs)
lbl = lbl - 1

# Perform one-hot encoding on labels
label_binarizer = LabelBinarizer()
y = label_binarizer.fit_transform(lbl)



In [None]:
# SNR = -10:2:20 dB
# signal length = 800
# format = complex numbers, you have to create saperate vectors for real and imaginary parts.
# label information: ["BPSK":1, "QPSK":2, "8PSK":3, "16QAM":4]

# * you can make labels 0,1,2, and 3 before processing.

# * Use h5py library to load the dataset in python. It would return a dict with keys : 'rcd_y', 'lbl', and 'snr'.
# use this code to load the data: 
# with h5py.File("file_name.mat", 'r') as Xd:
	
# 	X = np.array(Xd['rcd_y'])
# 	lbl = np.array(Xd['lbl'])[0] #taking only far user
# 	snrs = np.array(Xd['snrs'])

# cplx = X['real'] + X['imag']*1j
# c_t = cplx.transpose()
# Xs = np.zeros((c_t.shape[0],2,c_t.shape[1]))
# Xs[:,0,:] = c_t.real
# Xs[:,1,:] = c_t.imag
# snrs = np.transpose(snrs)
# lbl = lbl-1



#"BPSK":0  "QPSK":1  "8PSK":2 "16QAM":3
#"SNR":-20:2:20 db

In [5]:
# Use the data generator
batch_size = 32
train_data_generator = data_generator(Xs, lbl, batch_size=batch_size)

In [6]:
# Print the number of samples
num_samples = Xs.shape[0]
print(num_samples)
print(Xs.shape)

320000
(320000, 2, 800)


In [7]:
# Split the dataset into training and testing sets
X_train, X_test, y_train, y_test, snr_train, snr_test = train_test_split(
    Xs, y, snrs, test_size=0.2, random_state=42
)
batch_size = 32
train_data_generator = data_generator(X_train, y_train, batch_size=batch_size)
test_data_generator = data_generator(X_test, y_test, batch_size=batch_size)
# Reshape the input to match Conv1D expectations
X_train = X_train.transpose(0, 2, 1)  # Swap the dimensions
X_test = X_test.transpose(0, 2, 1)  # Swap the dimensions

In [None]:
# Define the model
model = Sequential()
model.add(Conv1D(32, kernel_size=3, activation='relu', input_shape=(800,2)))
model.add(BatchNormalization())
model.add(MaxPooling1D(pool_size=2))
model.add(Conv1D(14, kernel_size=3, activation='relu'))
model.add(BatchNormalization())
model.add(MaxPooling1D(pool_size=2))
model.add(Flatten())
model.add(Dropout(0.1))
model.add(Dense(4, activation='softmax'))
# Compile the model
model.compile(
    loss='categorical_crossentropy',
    optimizer=Adam(learning_rate=0.01),
    metrics=['accuracy'],
    run_eagerly=True
)



In [None]:
# Define callbacks for early stopping and learning rate reduction
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=4, verbose=1)

In [None]:
# Train the model
history = model.fit(
    X_train, y_train,
    epochs=30,
    batch_size=125,
    validation_split=0.1,
    callbacks=[early_stopping, reduce_lr]
)
model.save("amc_model_final.h5")

In [11]:
# Evaluate the model on the test set
from tensorflow.keras.models import load_model
model=load_model("LWAMCNET.h5")
test_loss, test_acc = model.evaluate(X_test, y_test)
print(f'Test Accuracy: {test_acc * 100:.2f}%')

import tensorflow as tf

# Convert TFLite model to SavedModel
converter = tf.lite.TFLiteConverter.from_keras_model(model)
saved_model = converter.convert()

# Save the SavedModel (optional)
with open("path/to/saved_model", "wb") as f:
    f.write(saved_model)

# Load the SavedModel as a Keras model
model = tf.keras.experimental.load_from_saved_model("path/to/saved_model")

# Load your test data (same as approach 1)

# Evaluate the model using Keras metrics
loss, accuracy = model.evaluate(test_data, ground_truth)

print(f"Model accuracy: {accuracy:.4f}")



OSError: No file or directory found at LWAMCNET.h5

In [None]:
# from sklearn.metrics import confusion_matrix
# # Make predictions on the test set
# y_pred = model.predict(X_test)
# y_pred_binary = (y_pred > 0.5).astype(int)  # Assuming it's a binary classification with a threshold of 0.5

# # Convert y_test to binary values if it's not already
# y_test_binary = (y_test > 0.5).astype(int)

# # Confusion matrix
# conf_matrix = confusion_matrix(y_test_binary, y_pred_binary)
# print("Confusion Matrix:")
# print(conf_matrix)

# # Extracting values for TN, FP, FN, TP from confusion matrix
# TN, FP, FN, TP = conf_matrix.ravel()

# # Specificity calculation
# specificity = TN / (TN + FP)
# print(f"Specificity: {specificity:.2f}")

# # Precision calculation
# precision = TP / (TP + FP)
# print(f"Precision: {precision:.2f}")
from sklearn.metrics import precision_score, recall_score, f1_score
# Make predictions on the test set
y_pred_prob = model.predict(X_test)

# Apply a threshold (adjust as needed)
threshold = 0.5
y_pred = (y_pred_prob > threshold).astype(int)

# Calculate precision, recall, and F1-score for each label
precision = precision_score(y_test, y_pred, average=None)
recall = recall_score(y_test, y_pred, average=None)
f1 = f1_score(y_test, y_pred, average=None)

# Print metrics for each label
for label, (prec, rec, f1_score) in enumerate(zip(precision, recall, f1)):
    print(f'Label {label}: Precision={prec:.2f}, Recall={rec:.2f}, F1-Score={f1_score:.2f}')

In [None]:
from sklearn.metrics import confusion_matrix
# Initialize a list to store specificity for each label
specificity_list = []

# Calculate specificity for each label
for label in range(y_test.shape[1]):
    # Extract the confusion matrix for the current label
    conf_matrix = confusion_matrix(y_test[:, label], y_pred[:, label])

    # Extract TN and FP from the confusion matrix
    TN, FP = conf_matrix[0, 0], conf_matrix[0, 1]

    # Calculate specificity for the current label
    specificity = TN / (TN + FP) if (TN + FP) > 0 else 0.0

    # Append specificity to the list
    specificity_list.append(specificity)

# Print specificity for each label
for label, spec in enumerate(specificity_list):
    print(f'Label {label}: Specificity={spec:.2f}')

In [None]:
model.summary()

In [None]:
# Save the model to a specific path
model.save("/kaggle/working/amc_model.h5")

In [None]:
import tensorflow as tf
from tensorflow.keras.models import load_model

# Assuming 'model' is your Keras model
model.save("/kaggle/working/", save_format="tf")

In [None]:
import tensorflow as tf

converter = tf.lite.TFLiteConverter.from_saved_model("/kaggle/working/")
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()

# Save the TFLite model to a file
with open("model.tflite", "wb") as f:
    f.write(tflite_model)

In [None]:
model.summary()

In [None]:
import pandas as pd

# Load the CSV file for real parts
real_csv_path = "/kaggle/input/imgreal/Real.csv"  # Replace with the actual path to your real parts CSV file
df_real = pd.read_csv(real_csv_path, usecols=[0], header=None, names=["RealPart"])

# Load the CSV file for imaginary parts
imaginary_csv_path = "/kaggle/input/imgreal/Imaginary..csv"  # Replace with the actual path to your imaginary parts CSV file
df_imaginary = pd.read_csv(imaginary_csv_path, usecols=[0], header=None, names=["ImaginaryPart"])

# Merge the DataFrames
df_merged = pd.concat([df_real, df_imaginary], axis=1)
# Save the merged DataFrame to a CSV file in the current directory
output_csv_path = "merged_data.csv"  # Save in the current directory
df_merged.to_csv(output_csv_path, index=False)
# Display the resulting merged DataFrame
print("Merged DataFrame:")
print(df_merged.head())


In [None]:
import numpy as np
import pandas as pd
from keras.models import load_model
from sklearn.preprocessing import StandardScaler

# Load the pre-trained model
model = load_model("/kaggle/input/amc-model-final/amc_model_final.h5")

# Load the CSV file containing real and imaginary parts
csv_file_path = "/kaggle/working/merged_data.csv"  # Replace with the actual path to your CSV file
df = pd.read_csv(csv_file_path, header=None, names=["RealPart", "ImaginaryPart"])

# Convert "RealPart" and "ImaginaryPart" columns to numeric
df["RealPart"] = pd.to_numeric(df["RealPart"], errors='coerce')
df["ImaginaryPart"] = pd.to_numeric(df["ImaginaryPart"], errors='coerce')

# Combine real and imaginary parts into a complex number column
complex_numbers = df["RealPart"] + 1j * df["ImaginaryPart"]

# Convert complex numbers to numpy array
data = np.array(complex_numbers.to_list(), dtype=np.complex64)
# print(data.shape)

# Ensure the length of data is a multiple of 800
remainder = len(data) % 800
if remainder != 0:
    data = data[:-remainder]

# Reshape data to match the input shape expected by the model
reshaped_data = np.stack((data.real, data.imag), axis=-1).reshape(-1, 800, 2)

# Convert data to float32
reshaped_data = reshaped_data.astype(np.float32)

# Standardize the data (you may need to use the same scaler used during training)
scaler = StandardScaler()
reshaped_data = scaler.fit_transform(reshaped_data.reshape(-1, 2)).reshape(reshaped_data.shape)

# Make predictions
predictions = model.predict(reshaped_data)

# Convert predictions to class labels
predicted_labels = np.argmax(predictions, axis=1)
# Map numeric predictions to labels
label_mapping = {0: "BPSK", 1: "QPSK", 2: "8PSK", 3: "16QAM"}

# Convert numeric predictions to label names
predicted_labels_names = [label_mapping[label] for label in predicted_labels]

# Display the predicted labels
print("Predicted Labels:", predicted_labels_names)

# # Display the predicted labels
# print("Predicted Labels:", predicted_labels)
