In [None]:
import pickle
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
import os
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
from matplotlib.ticker import MultipleLocator
from sklearn.metrics import confusion_matrix
import seaborn as sns

def init_setup():
    # initiate the dataset for the algorithm
    idx = 2
    print("loading data")
    # load the data
    samples = save_and_load_list(2, "cumulants_vec", idx)
    samples_dx = save_and_load_list(2, "cum_dx_vec", idx)
    sample_real = save_and_load_list(2, "cum_real_vec", idx)
    sample_imag = save_and_load_list(2, "cum_imag_vec", idx)
    labels = save_and_load_list(2, "labels", idx)
    snr = save_and_load_list(2, "snr_vec", idx)
    # dataset = save_and_load_list(2, "dataset", idx)
    dataset = {'samples': samples, 'dx': samples_dx, 'real': sample_real, 'imag': sample_imag, 
                'label': labels, 'snr':snr}
    print('finished')
    return dataset

def save_and_load_list(variable, name, case):
    # save and load using pickle based on what to do (case)
    # case 1 for save my_list
    # case 2 for load file
    path = "data/"
    if case == 1:
        with open(path + name + ".pkl", "wb") as f:
            pickle.dump(variable, f)
    if case == 2:
        with open(path + name + ".pkl", "rb") as f:
            load_variable = pickle.load(f)
        return load_variable
    
def prep_data(dataset):
    # get the data ready for classification
    dataset_prep = {}
    dataset_prep['label'] = np.array(dataset['label'])
    dataset_prep['snr'] = np.array(dataset['snr'])
    dataset_prep['data']=cumulant_fix(dataset['samples'],dataset['dx'],dataset['real'],dataset['imag'])
    return dataset_prep

def cumulant_fix(cum,dx,I,Q):
    features_vec = []
    for ii in range(len(cum)):
        cum_abs = abs(cum[ii])
        dx_abs = abs(dx[ii])
        abs_I = abs(I[ii])
        abs_Q = abs(Q[ii])
        feats = []
        for jj in range(2,len(cum_abs)):
            feats.append(np.log10(cum_abs[jj]/cum_abs[1]))
        for jj in [2,3,4,5,6]:
            feats.append(np.log10(dx_abs[jj]))
        for jj in range(1,len(abs_I)):
            feats.append(np.log10(abs_I[jj]*0.5 + abs_Q[jj]*0.5))
        features_vec.append(feats) 
    
    return np.array(features_vec)

def start_amc():
    # get the data preprocessed
    dataset = init_setup()
    dataset_prep = prep_data(dataset)
    easy_mods = [
        "OOK",
        "4ASK",
        "BPSK",
        "QPSK",
        "8PSK",
        "16QAM",
        "AM-SSB-SC",
        "AM-DSB-SC",
        "FM",
        "GMSK",
        "OQPSK",
    ]
    easy_mask = np.isin(dataset_prep['label'], easy_mods)
    labels = dataset_prep['label'][easy_mask]
    snr = dataset_prep['snr'][easy_mask]
    samples = dataset_prep['data'][easy_mask]

    return samples,labels,snr

def init_classification(samples, labels, snr, name, train_tresh, tree_depth):
    # get the accuracy graph per SNR and per label
    labeling = f"train_from_{train_tresh}_SNR_{name}"
    x_train, x_test, y_train, y_test,snr_test = data_spliting(samples, labels, snr, train_tresh)
    classifier = DecisionTreeClassifier(max_depth=tree_depth)
    classifier.fit(x_train, y_train)
    # print(classifier.get_depth())
    try:
        os.mkdir(labeling)
    except FileExistsError:
        print('skip creation')
    print("Classify per SNR")
    accuracy_data = detection_per_snr(x_test, y_test, snr_test, classifier, labeling)
    # accuracy_data =1
    print("Classify per Label")
    detection_per_label(x_test, y_test, snr_test, classifier, labeling)
    
    return accuracy_data,classifier

def data_spliting(samples, labels ,snr , train_tresh):
    # split data for training using only high SNR
    if train_tresh>0:
        mask = snr>train_tresh
        samples_mask = samples[mask]
        labels_mask = labels[mask]
        snr_mask = snr[mask]
        samples_not = samples[~mask]
        labels_not = labels[~mask]
        x_train, x_test, y_train, y_test, train_indices, test_indices = train_test_split(
        samples_mask,
        labels_mask,
        range(len(samples_mask)),
        test_size=0.3,
        random_state=42,
        stratify=labels_mask,
    )
        x_test=np.concatenate((x_test,samples_not))
        y_test=np.concatenate((y_test,labels_not))
        snr_test = np.concatenate((snr_mask[test_indices],snr[~mask]))
    else:
        x_train, x_test, y_train, y_test, train_indices, test_indices = train_test_split(
        samples,
        labels,
        range(len(samples)),
        test_size=0.33,
        random_state=42,
        stratify=labels,
    )
        snr_test = snr[test_indices]
    return x_train, x_test, y_train, y_test,snr_test

def detection_per_snr(x_test, y_test, snr_test, classifier, name):
    # classification per SNR
    unique_snr = np.unique(snr_test)
    accuracy_list = []
    for snr_val in unique_snr:
        mask = snr_test == snr_val
        x_snr = x_test[mask]
        y_snr = y_test[mask]
        y_pred_snr = classifier.predict(x_snr)
        accuracy_snr = accuracy_score(y_snr, y_pred_snr)
        accuracy_list.append(accuracy_snr)
        if snr_val in range(11):
            plot_confusion_matrix(y_snr, y_pred_snr, np.unique(y_snr), snr_val, name)
    return {"snr": unique_snr, "accuracy": accuracy_list, "name": name}

def detection_per_label(x_test, y_test, snr_test, classifier, name):
    # classify per label
    unique_snr = np.unique(snr_test)
    unique_label = np.unique(y_test)
    i=1
    num_plots = i  # Number of plots needed
    fig, axs = plt.subplots(nrows=num_plots, ncols=1, figsize=(10, 6 * num_plots))
    axs.set_title(f"Accuracy vs SNR")
    for label in unique_label:
        if label =='others':
            continue
        mask = y_test == label
        x_label = x_test[mask]
        y_label = y_test[mask]
        snr_label = snr_test[mask]
        accuracy_list = []
        for snr_val in unique_snr:
            mask_snr = snr_label == snr_val
            x_snr = x_label[mask_snr]
            y_snr = y_label[mask_snr]
            y_pred_snr = classifier.predict(x_snr)
            accuracy_snr = accuracy_score(y_snr, y_pred_snr)
            accuracy_list.append(accuracy_snr)
        axs.plot(unique_snr, accuracy_list, label=label)
    axs.set_xlabel("SNR")
    axs.set_ylabel("Accuracy")
    axs.legend()
    axs.yaxis.set_major_locator(MultipleLocator(0.1))
    axs.xaxis.set_major_locator(MultipleLocator(2))
    axs.grid(which='major')
    plt.tight_layout()
    file_path = os.path.join(name, f"label_group_accuracy_{name}.png")
    plt.savefig(file_path)
    print("saved plots")
    plt.clf()
    plt.close()
    
def detection_per_label_old(x_test, y_test, snr_test, name,classifier1,classifier2,classifier3):
    group1 = ['AM-DSB-SC','AM-SSB-SC','FM','GMSK','OQPSK']
    group2 = ['OOK','4ASK','BPSK','QPSK','8PSK','16QAM']
    groups = [group1, group2]   
    # classify per label
    unique_snr = np.unique(snr_test)
    num_plots = len(groups)  # Number of plots needed
    fig, axs = plt.subplots(nrows=num_plots, ncols=1, figsize=(10, 6 * num_plots))
    for i, label_group in enumerate(groups):
        axs[i].set_title(f"Accuracy vs SNR for Label Group {i+1}")
        for label in label_group:
            mask = y_test == label
            x_label = x_test[mask]
            y_label = y_test[mask]
            snr_label = snr_test[mask]
            accuracy_list = []
            for snr_val in unique_snr:
                mask_snr = snr_label == snr_val
                x_snr = x_label[mask_snr]
                y_snr = y_label[mask_snr]
                try:
                    y_pred_snr = steps_tree(x_snr,classifier1,classifier2,classifier3)
                except:
                    print('help')
                accuracy_snr = accuracy_score(y_snr, y_pred_snr)
                accuracy_list.append(accuracy_snr)
            axs[i].plot(unique_snr, accuracy_list, label=label)
        axs[i].set_xlabel("SNR")
        axs[i].set_ylabel("Accuracy")
        axs[i].legend()
        axs[i].yaxis.set_major_locator(MultipleLocator(0.1))
        axs[i].xaxis.set_major_locator(MultipleLocator(2))
        axs[i].grid(which='major')
    plt.tight_layout()
    file_path =  f"label_group_accuracy_{name}.png"
    plt.savefig(file_path)
    print("saved plots")
    plt.clf()
    plt.close()
    
def plot_confusion_matrix(y_true, y_pred, labels, snr, name):
    # plot confusion matrix for the data
    cm = confusion_matrix(y_true, y_pred, labels=labels)
    accuracy = accuracy_score(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=labels, yticklabels=labels)
    plt.xlabel("Predicted Labels")
    plt.ylabel("True Labels")
    plt.title(f"Confusion Matrix for SNR={snr}dB, Acc={accuracy}%")
    file_path = os.path.join(name, f"SNR={snr}.png")
    plt.savefig(file_path)
    plt.clf()
    plt.close()
    
def split_classification(samples,labels,snr,feats,name,group):
    
    mask = np.isin(labels,group)
    labels_temp = labels.copy()
    if group:
        labels_temp[~mask]='others'
    samples_fix = samples[:,feats]
    train_tresh = 1
    tree_depth = 4
    accuracy_data,classifier = init_classification(samples_fix, labels_temp, snr, name, train_tresh, tree_depth)
    return classifier,mask

def combine_accuracy_graphs(*args):
    
    fig, ax = plt.subplots()
    for accuracy_data in args:
        ax.plot(accuracy_data["snr"], accuracy_data["accuracy"], label=accuracy_data["name"])
    ax.set_xlabel("SNR")
    ax.set_ylabel("Accuracy")
    ax.set_title("Accuracy vs SNR")
    ax.legend()
    ax.yaxis.set_major_locator(MultipleLocator(0.1))
    ax.xaxis.set_major_locator(MultipleLocator(2))
    ax.grid(which='major')
    num_files_saved = sum(1 for file in os.listdir('.') if file.startswith('combine_accuracy_graph_'))
    file_path = f'combine_accuracy_graph_{num_files_saved + 1}.png'
    plt.savefig(file_path)
    plt.clf()
    plt.close()

def steps_tree(X,classifier1,classifier2,classifier3):
# Step 1: Initial classification into 4 labels, one of them is "others"
    labels_step1 = classifier1.predict(X)

    # Step 2: Identify "others" samples from Step 1 and classify them into 4 labels, one of them is "others"
    X_others_step2 = X[labels_step1 == "others"]
    if len(X_others_step2)>0:
        labels_step2 = classifier2.predict(X_others_step2)
        X_others_step3 = X_others_step2[labels_step2 == "others"]
        if len(X_others_step3)>0:     
            labels_step3 = classifier3.predict(X_others_step3)
        else:
            labels_step3 = 'good'
    else:
        labels_step2='good'
        labels_step3 = 'good'

    # Step 3: Identify "others" samples from Step 2 and classify them into 5 labels
            

    # Combine the labels from Steps 1, 2, and 3 into the final labels
    final_labels = []
    current_index_step2 = 0
    current_index_step3 = 0

    for label in labels_step1:
        if label == "others":
            if labels_step2[current_index_step2] == "others":
                final_labels.append(labels_step3[current_index_step3])
                current_index_step3 += 1
            else:
                final_labels.append(labels_step2[current_index_step2])
            current_index_step2 += 1
        else:
            final_labels.append(label)

    # Now 'final_labels' contains the final classification of all samples

    # Now 'final_labels' contains the final classification of all samples
    return final_labels

def step_by_step(samples,labels,snr):
    
    all_feats = [i for i in range(len(samples[1]))]

    split1 = ['AM-SSB-SC','4ASK','BPSK']
    classifier1,mask1 = split_classification(samples,labels,snr,all_feats,'split_1',split1)

    temp_labels = labels[~mask1]
    samples2 = samples[~mask1]
    snr_temp = snr[~mask1]
    split2 = ['AM-DSB-SC','FM','OOK']
    classifier2,mask2 = split_classification(samples2,temp_labels,snr_temp,all_feats,'split_2',split2)

    temp_labels = labels[~mask1][~mask2]
    samples3 = samples2[~mask2]
    snr_temp = snr_temp[~mask2]
    split3 = ['16QAM','8PSK','GMSK','QPSK','OQPSK']
    classifier3,mask3 = split_classification(samples3,temp_labels,snr_temp,all_feats,'split_3',split3)
    
    detection_per_label_old(samples, labels, snr, 'full split',classifier1,classifier2,classifier3)
    # classification per SNR
    unique_snr = np.unique(snr)
    unique_labels = np.unique(labels)
    accuracy_list = []
    for snr_val in unique_snr:
        mask = snr == snr_val
        x_snr = samples[mask]
        y_snr = labels[mask]
        final_labels = steps_tree(x_snr,classifier1,classifier2,classifier3)
        accuracy_snr = accuracy_score(y_snr, final_labels)
        accuracy_list.append(accuracy_snr)
        if snr_val in range(11):
            cm = confusion_matrix(y_snr, final_labels, labels=unique_labels)
            plt.figure(figsize=(8, 6))
            sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=unique_labels, yticklabels=unique_labels)
            plt.xlabel("Predicted Labels")
            plt.ylabel("True Labels")
            plt.title(f"Confusion Matrix for SNR={snr_val}dB, Acc={accuracy_snr}%")
            file_path = f"confusion_matrix_SNR={snr_val}.png"
            plt.savefig(file_path)
            plt.clf()
            plt.close()
    accuracy_steps  = {"snr": unique_snr, "accuracy": accuracy_list, "name": 'Steps Classification'}
    return accuracy_steps

In [None]:
samples,labels,snr = start_amc()

In [None]:
print("we are here")
train_tresh = 1
# samples_fix = samples[:,feats]
# regular tree classifiers
print("classification for cumulants...")
accuracy_data0,classifier = init_classification(samples[:,range(6)], labels, snr, "cumulants", 1, 6)
accuracy_data1,classifier = init_classification(samples[:,range(10)], labels, snr, "cumulants_and_dx", 1, 6)
accuracy_data2,classifier = init_classification(samples, labels, snr, "cumulants_with_dx_IQ", 1, 6)


In [None]:

accuracy_steps = step_by_step(samples,labels,snr)
combine_accuracy_graphs(accuracy_data0,accuracy_data1,accuracy_data2,accuracy_steps)


In [None]:
# #DNN classifier
# import tensorflow as tf
# from tensorflow.keras import layers, models

# model = models.Sequential()
# model.add(layers.Dense(16, activation='relu', input_shape=(X_train.shape[1],)))
# model.add(layers.Dense(8, activation='relu'))
# model.add(layers.Dense(np.unique(y_train).shape[0], activation='softmax'))
# # Compile the model
# model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
# model.fit(X_train, y_train, epochs=1, batch_size=32)


In [None]:

# # SVM classifier
# from sklearn.svm import SVC
# from sklearn.metrics import accuracy_score

# svm_classifier = SVC(kernel='linear', C=1.0)  # You can use other kernels as well
# print('fitting')
# svm_classifier.fit(X_train, y_train)
# print('predict')
# y_pred = svm_classifier.predict(X_test)
# # Calculate the accuracy of the model
# accuracy = accuracy_score(y_test, y_pred)
# print("Accuracy:", accuracy)