In [1]:
from algorithm_module import start_amc

samples, labels, snr, feature_list = start_amc()

loading data
finished


In [2]:
import pickle
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
import os
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
from matplotlib.ticker import MultipleLocator
from sklearn.metrics import confusion_matrix
import seaborn as sns
import h5py

In [3]:
def step_by_step(samples,labels,snr,tree_depth):
    
    all_feats = [i for i in range(len(samples[1]))]
    # feats1 = [5,6,7,8,9,10]
    split1 = ['AM-SSB-SC','4ASK','OOK']
    classifier1,mask1 = split_classification(samples,labels,snr,all_feats,'Split Step 1',split1,tree_depth)

    temp_labels = labels[~mask1]
    samples2 = samples[~mask1]
    snr_temp = snr[~mask1]
    split2 = ['GMSK','AM-DSB-SC','BPSK','FM']
    classifier2,mask2 = split_classification(samples2,temp_labels,snr_temp,all_feats,'Split Step 2',split2,tree_depth)

    temp_labels = labels[~mask1][~mask2]
    samples3 = samples2[~mask2]
    snr_temp = snr_temp[~mask2]
    split3 = ['16QAM','8PSK','QPSK','OQPSK']
    classifier3,mask3 = split_classification(samples3,temp_labels,snr_temp,all_feats,'Split Step 3',split3,tree_depth)
    
    detection_per_label_steps(samples, labels, snr, 'Full Split',classifier1,classifier2,classifier3)
    # classification per SNR
    unique_snr = np.unique(snr)
    unique_labels = np.unique(labels)
    accuracy_list = []
    for snr_val in unique_snr:
        mask = snr == snr_val
        x_snr = samples[mask]
        y_snr = labels[mask]
        final_labels = steps_tree(x_snr,classifier1,classifier2,classifier3)
        accuracy_snr = accuracy_score(y_snr, final_labels)
        accuracy_list.append(accuracy_snr)
        if snr_val in range(11):
            cm = confusion_matrix(y_snr, final_labels, labels=unique_labels)
            plt.figure(figsize=(8, 6))
            sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=unique_labels, yticklabels=unique_labels)
            plt.xlabel("Predicted Labels")
            plt.ylabel("True Labels")
            plt.title(f"Confusion Matrix for SNR={snr_val}dB, Acc={accuracy_snr}%")
            file_path = f"confusion_matrix_SNR={snr_val}.png"
            plt.savefig(file_path)
            plt.clf()
            plt.close()
    accuracy_steps  = {"snr": unique_snr, "accuracy": accuracy_list, "name": 'Steps Classification'}
    return accuracy_steps

def split_classification(samples,labels,snr,feats,name,group,tree_depth):
    
    mask = np.isin(labels,group)
    labels_temp = labels.copy()
    if group:
        labels_temp[~mask]='others'
    samples_fix = samples[:,feats]
    train_tresh = 2
    accuracy_data, classifier = init_classification(samples_fix, labels_temp, snr, name, train_tresh, tree_depth)
    return classifier,mask

def init_classification(samples, labels, snr, name, train_tresh, tree_depth):
    # get the accuracy graph per SNR and per label
    labeling = f"Train set SNR>{train_tresh}dB, {name}"
    if train_tresh<0:
        labeling = f"Full Train set, {name}"
    x_train, x_test, y_train, y_test,snr_test = data_spliting(samples, labels, snr, train_tresh)
    classifier = DecisionTreeClassifier(max_depth=tree_depth)
    classifier.fit(x_train, y_train)
    # print(classifier.get_depth())
    try:
        os.mkdir(labeling)
    except FileExistsError:
        print('skip creation')
    print("Classify per SNR")
    accuracy_data = detection_per_snr(x_test, y_test, snr_test, classifier, labeling)
    # accuracy_data =1
    print("Classify per Label")
    detection_per_label(x_test, y_test, snr_test, classifier, labeling)
    
    return accuracy_data,classifier

def data_spliting(samples, labels ,snr , train_tresh):
    # split data for training using only high SNR
    if train_tresh>0:
        mask = snr>train_tresh
        samples_mask = samples[mask]
        labels_mask = labels[mask]
        snr_mask = snr[mask]
        samples_not = samples[~mask]
        labels_not = labels[~mask]
        x_train, x_test, y_train, y_test, train_indices, test_indices = train_test_split(
        samples_mask,
        labels_mask,
        range(len(samples_mask)),
        test_size=0.2,
        random_state=40,
        stratify=labels_mask,
    )
        x_test=np.concatenate((x_test,samples_not))
        y_test=np.concatenate((y_test,labels_not))
        snr_test = np.concatenate((snr_mask[test_indices],snr[~mask]))
    else:
        x_train, x_test, y_train, y_test, train_indices, test_indices = train_test_split(
        samples,
        labels,
        range(len(samples)),
        test_size=0.2,
        random_state=41,
        stratify=labels,
    )
        snr_test = snr[test_indices]
    return x_train, x_test, y_train, y_test,snr_test

def detection_per_snr(x_test, y_test, snr_test, classifier, name):
    # classification per SNR
    unique_snr = np.unique(snr_test)
    accuracy_list = []
    for snr_val in unique_snr:
        mask = snr_test == snr_val
        x_snr = x_test[mask]
        y_snr = y_test[mask]
        y_pred_snr = classifier.predict(x_snr)
        accuracy_snr = accuracy_score(y_snr, y_pred_snr)
        accuracy_list.append(accuracy_snr)
        if snr_val in range(11):
            plot_confusion_matrix(y_snr, y_pred_snr, np.unique(y_snr), snr_val, name)
    return {"snr": unique_snr, "accuracy": accuracy_list, "name": name}

def detection_per_label(x_test, y_test, snr_test, classifier, name):
    # classify per label
    unique_snr = np.unique(snr_test)
    unique_label = np.unique(y_test)
    i=1
    num_plots = i  # Number of plots needed
    fig, axs = plt.subplots(nrows=num_plots, ncols=1, figsize=(10, 6 * num_plots))
    axs.set_title(f"Accuracy vs SNR")
    for label in unique_label:
        if label =='others':
            continue
        mask = y_test == label
        x_label = x_test[mask]
        y_label = y_test[mask]
        snr_label = snr_test[mask]
        accuracy_list = []
        for snr_val in unique_snr:
            mask_snr = snr_label == snr_val
            x_snr = x_label[mask_snr]
            y_snr = y_label[mask_snr]
            y_pred_snr = classifier.predict(x_snr)
            accuracy_snr = accuracy_score(y_snr, y_pred_snr)
            accuracy_list.append(accuracy_snr)
        axs.plot(unique_snr, accuracy_list, label=label)
    axs.set_xlabel("SNR")
    axs.set_ylabel("Accuracy")
    axs.legend()
    axs.yaxis.set_major_locator(MultipleLocator(0.1))
    axs.xaxis.set_major_locator(MultipleLocator(2))
    axs.grid(which='major')
    plt.tight_layout()
    file_path = os.path.join(name, f"label_group_accuracy_{name}.png")
    plt.savefig(file_path)
    print("saved plots")
    plt.clf()
    plt.close()

def detection_per_label_steps(x_test, y_test, snr_test, name,classifier1,classifier2,classifier3):
    group1 = ['AM-DSB-SC','AM-SSB-SC','FM','GMSK','OQPSK']
    group2 = ['OOK','4ASK','BPSK','QPSK','8PSK','16QAM']
    groups = [group1, group2]   
    # classify per label
    unique_snr = np.unique(snr_test)
    num_plots = len(groups)  # Number of plots needed
    fig, axs = plt.subplots(nrows=num_plots, ncols=1, figsize=(10, 6 * num_plots))
    for i, label_group in enumerate(groups):
        axs[i].set_title(f"Accuracy vs SNR for Label Group {i+1}")
        for label in label_group:
            mask = y_test == label
            x_label = x_test[mask]
            y_label = y_test[mask]
            snr_label = snr_test[mask]
            accuracy_list = []
            for snr_val in unique_snr:
                mask_snr = snr_label == snr_val
                x_snr = x_label[mask_snr]
                y_snr = y_label[mask_snr]
                try:
                    y_pred_snr = steps_tree(x_snr,classifier1,classifier2,classifier3)
                except:
                    print('help')
                    print('error')
                accuracy_snr = accuracy_score(y_snr, y_pred_snr)
                accuracy_list.append(accuracy_snr)
            axs[i].plot(unique_snr, accuracy_list, label=label)
        axs[i].set_xlabel("SNR")
        axs[i].set_ylabel("Accuracy")
        axs[i].legend()
        axs[i].yaxis.set_major_locator(MultipleLocator(0.1))
        axs[i].xaxis.set_major_locator(MultipleLocator(2))
        axs[i].grid(which='major')
    plt.tight_layout()
    file_path =  f"label_group_accuracy_{name}.png"
    plt.savefig(file_path)
    print("saved plots")
    plt.clf()
    plt.close()

def plot_confusion_matrix(y_true, y_pred, labels, snr, name):
    # plot confusion matrix for the data
    cm = confusion_matrix(y_true, y_pred, labels=labels)
    accuracy = accuracy_score(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=labels, yticklabels=labels)
    plt.xlabel("Predicted Labels")
    plt.ylabel("True Labels")
    plt.title(f"Confusion Matrix for SNR={snr}dB, Acc={accuracy}%")
    file_path = os.path.join(name, f"SNR={snr}.png")
    plt.savefig(file_path)
    plt.clf()
    plt.close()

def combine_accuracy_graphs(*args):
    
    # linestyles = ['-', '--',':']  # Different linestyles for each scenario
    linestyles = ['-']
    markes = ['o','s','^','x','*','+']
    fig, ax = plt.subplots()
    for i, accuracy_data in enumerate(args):
        linestyle = linestyles[i % len(linestyles)]
        marker = markes[i % len(markes)]
        ax.plot(
            accuracy_data["snr"], accuracy_data["accuracy"],
            linestyle=linestyle, linewidth=1, marker=marker, markersize=4, label=accuracy_data["name"]
        )
    ax.set_xlabel("SNR")
    ax.set_ylabel("Accuracy")
    ax.set_title("Accuracy vs SNR")
    ax.legend()
    ax.yaxis.set_major_locator(MultipleLocator(0.1))
    ax.xaxis.set_major_locator(MultipleLocator(4))
    ax.grid(which='major')
    num_files_saved = sum(1 for file in os.listdir('.') if file.startswith('combine_accuracy_graph_'))
    file_path = f'combine_accuracy_graph_{num_files_saved + 1}.png'
    plt.savefig(file_path)
    plt.clf()
    plt.close()

def steps_tree(X,classifier1,classifier2,classifier3):
# Step 1: Initial classification into 4 labels, one of them is "others"
    labels_step1 = classifier1.predict(X)

    # Step 2: Identify "others" samples from Step 1 and classify them into 4 labels, one of them is "others"
    X_others_step2 = X[labels_step1 == "others"]
    if len(X_others_step2)>0:
        labels_step2 = classifier2.predict(X_others_step2)
        X_others_step3 = X_others_step2[labels_step2 == "others"]
        if len(X_others_step3)>0:     
            labels_step3 = classifier3.predict(X_others_step3)
        else:
            labels_step3 = 'good'
    else:
        labels_step2='good'
        labels_step3 = 'good'

    # Step 3: Identify "others" samples from Step 2 and classify them into 5 labels
            

    # Combine the labels from Steps 1, 2, and 3 into the final labels
    final_labels = []
    current_index_step2 = 0
    current_index_step3 = 0

    for label in labels_step1:
        if label == "others":
            if labels_step2[current_index_step2] == "others":
                final_labels.append(labels_step3[current_index_step3])
                current_index_step3 += 1
            else:
                final_labels.append(labels_step2[current_index_step2])
            current_index_step2 += 1
        else:
            final_labels.append(label)

    # Now 'final_labels' contains the final classification of all samples

    # Now 'final_labels' contains the final classification of all samples
    return final_labels



In [20]:
# # First Test - Get the minmimal tree depth
acc1,classifier = init_classification(samples, labels, snr, "Tree depth=4", -1, 4)
acc2,classifier = init_classification(samples, labels, snr, "Tree depth=6", -1, 6)
acc3,classifier = init_classification(samples, labels, snr, "Tree depth=8", -1, 8)
acc4,classifier = init_classification(samples, labels, snr, "Tree depth=10", -1, 10)
acc5,classifier = init_classification(samples, labels, snr, "Tree depth=20", -1, 20)
acc6,classifier = init_classification(samples, labels, snr, "Tree depth=50", -1, 50)

combine_accuracy_graphs(acc1,acc2,acc3,acc4,acc5,acc6)


In [5]:
# Second Test - Get the best SNR threshold for the training set
accuracy_train1,classifier = init_classification(samples, labels, snr, "Tree depth=20", -1, 20)
accuracy_train2,classifier = init_classification(samples, labels, snr, "Tree depth=20", 1, 20)
accuracy_train3,classifier = init_classification(samples, labels, snr, "Tree depth=20", 5, 20)
accuracy_train4,classifier = init_classification(samples, labels, snr, "Tree depth=20", 10, 20)
accuracy_train5,classifier = init_classification(samples, labels, snr, "Tree depth=20", 20, 20)

combine_accuracy_graphs(accuracy_train1, accuracy_train2, accuracy_train3, accuracy_train4, accuracy_train5)


skip creation
Classify per SNR
Classify per Label
saved plots
Classify per SNR
Classify per Label
saved plots
Classify per SNR
Classify per Label
saved plots
Classify per SNR
Classify per Label
saved plots
Classify per SNR
Classify per Label
saved plots


In [None]:
# third test - PCA possibility
from sklearn.decomposition import PCA
accuracy_list = []
for pp in [5,10,15,20]:
    pca = PCA(n_components=pp)
    pca.fit(np.log10(samples))
    principalComponents = pca.fit_transform(np.log10(samples))
    accuracy_train,classifier = init_classification(principalComponents, labels, snr, f" PCA for {pp} components", 20, 7)
    accuracy_list.append(accuracy_train)
    
accuracy_train,classifier = init_classification(np.log10(samples), labels, snr, f"No PCA", 20, 7)
accuracy_list.append(accuracy_train)
combine_accuracy_graphs(accuracy_list[0], accuracy_list[1], accuracy_list[2] ,accuracy_list[3])


In [None]:

print("we are here")
train_tresh = 1
tree_depth = 3
# samples_fix = samples[:,feats]
# regular tree classifiers
print("classification for cumulants...")
accuracy_data0,classifier = init_classification(samples[:,range(8)], labels, snr, "cumulants", train_tresh, tree_depth)
accuracy_data1,classifier = init_classification(samples[:,range(14)], labels, snr, "cumulants with dx", train_tresh, tree_depth)
accuracy_data2,classifier = init_classification(samples, labels, snr, "cumulants, dx, IQ and phase", train_tresh, tree_depth)
accuracy_steps = step_by_step(samples, labels, snr, tree_depth)

combine_accuracy_graphs(accuracy_data0,accuracy_data1,accuracy_data2,accuracy_steps)


In [None]:
# all_feats = [i for i in range(len(samples[1]))]
tree_depth = 3
feats =[list(range(8))+list(range(14,25))] 
feats = feats[0]
split1 = ['AM-SSB-SC','4ASK','OOK']
classifier1,mask1 = split_classification(samples,labels,snr,feats,'split_1',split1,tree_depth)

In [None]:
# #DNN classifier
# import tensorflow as tf
# from tensorflow.keras import layers, models

# model = models.Sequential()
# model.add(layers.Dense(16, activation='relu', input_shape=(X_train.shape[1],)))
# model.add(layers.Dense(8, activation='relu'))
# model.add(layers.Dense(np.unique(y_train).shape[0], activation='softmax'))
# # Compile the model
# model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
# model.fit(X_train, y_train, epochs=1, batch_size=32)


In [None]:

# # SVM classifier
# from sklearn.svm import SVC
# from sklearn.metrics import accuracy_score

# svm_classifier = SVC(kernel='linear', C=1.0)  # You can use other kernels as well
# print('fitting')
# svm_classifier.fit(X_train, y_train)
# print('predict')
# y_pred = svm_classifier.predict(X_test)
# # Calculate the accuracy of the model
# accuracy = accuracy_score(y_test, y_pred)
# print("Accuracy:", accuracy)