In [None]:
%matplotlib inline

In [None]:
import warnings
from datetime import datetime

from sklearn.metrics import classification_report
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from tensorflow.python.keras import regularizers
from tensorflow.python.keras.callbacks import EarlyStopping
from tensorflow.python.keras.layers import Dense, LeakyReLU
from tensorflow.python.keras.models import Sequential
from tensorflow.python.keras.optimizer_v2.rmsprop import RMSprop

warnings.filterwarnings("ignore", category=UserWarning)

from genome_ac_gan_training import polyloss_ce
from utils.util import *

init_gpus()

output_folder = "classifier_analysis"
train_set = '../resource/train_AFR_pop.csv'
test_set = '../resource/test_AFR_pop.csv'
experiment_results = '../resource/sub_AFR_aug_genotypes.hapt'

output_file = "classifiers_results_kmean.csv"
target_column = 'Population code'

In [None]:
# df = pd.read_csv(os.path.join(output_folder, output_file))
# rows = df.to_dict(orient='records')
rows = []
len(rows)

In [None]:
(x_train, y_train), class_id_to_counts, _, class_to_id = init_dataset(hapt_genotypes_path=train_set,
                                                                      target_column=target_column,
                                                                      with_extra_data=False)
id_to_class = {v: k for k, v in class_to_id.items()}

test_dataset = prepare_test_and_fake_dataset(experiment_results, test_path=test_set,
                                             target_column=target_column,
                                             class_to_id=class_to_id)

y_train = np.argmax(y_train, axis=-1)
uniques, counts = np.unique(y_train, return_counts=True)
total_samples = len(y_train)
percentages = counts / total_samples * 100
class_percentage_dict = dict(zip(uniques, percentages))
print(class_percentage_dict)

generated_samples = prepare_test_and_fake_dataset(experiment_results,
                                                  test_path=experiment_results,
                                                  from_generated=True,
                                                  class_to_id=class_to_id)
print("shape: ", generated_samples[0].shape)

In [None]:



def calculate_accuracy(precision, recall):
    # Assuming precision and recall are in the range [0, 1]
    epsilon = 0.00001
    return (precision * recall) / ((precision + recall + epsilon) / 2)


def shuffle_test_dataset():
    indices = np.arange(test_dataset[0].shape[0])
    np.random.shuffle(indices)
    return (
        test_dataset[0][indices],
        np.array(test_dataset[1])[indices]
    )


def build_classifier(number_of_genotypes: int, alph: float, lr: float):
    nn_classifier = Sequential([
        Dense(number_of_genotypes // 2, input_shape=(number_of_genotypes,),
              kernel_regularizer=regularizers.l2(0.0001)),
        LeakyReLU(alpha=alph),
        Dense(number_of_genotypes // 3, input_shape=(number_of_genotypes,),
              kernel_regularizer=regularizers.l2(0.0001)),
        LeakyReLU(alpha=alph),
        Dense(number_of_genotypes // 4, input_shape=(number_of_genotypes,),
              kernel_regularizer=regularizers.l2(0.0001)),
        LeakyReLU(alpha=alph),
        Dense(7, activation='softmax')
    ])

    nn_classifier.compile(optimizer=RMSprop(learning_rate=lr), loss=polyloss_ce, metrics='accuracy')

    return nn_classifier


def concatenate_fake_data(percentage, generated_samples, x_train, y_train):
    if percentage == 0:
        train_dataset_with_generated_data = (x_train, y_train)
        num_samples = 0
    else:
        num_samples = int(abs(percentage) * len(x_train))
        generated_percentage = min(num_samples / float(generated_samples[0].shape[0]), 0.99)
        print("generated_percentage", generated_percentage)
        _, X_synthetic, _, Y_synthetic = train_test_split(generated_samples[0],
                                                          np.array(generated_samples[1]),
                                                          test_size=generated_percentage,
                                                          stratify=np.array(generated_samples[1]),
                                                          random_state=None)
        if percentage > 0:
            train_dataset_with_generated_data = (
                np.concatenate((x_train, X_synthetic), axis=0),
                np.concatenate((y_train, Y_synthetic), axis=0)
            )
        else:
            train_dataset_with_generated_data = (X_synthetic, Y_synthetic)
    indices = np.arange(train_dataset_with_generated_data[0].shape[0])
    np.random.shuffle(indices)

    # Shuffle the dataset using the indices
    shuffled_dataset = (
        train_dataset_with_generated_data[0][indices],
        train_dataset_with_generated_data[1][indices]
    )
    return shuffled_dataset, num_samples

In [None]:
from sklearn.cluster import KMeans
from sklearn.cluster import MiniBatchKMeans

def init_models():
    knn = KNeighborsClassifier(n_neighbors=5)
    nn = build_classifier(number_of_genotypes=10000, alph=0.01, lr=0.0001)
    svc = SVC()
    # {"lloyd", "elkan", "auto", "full"}
    kmeans = MiniBatchKMeans(n_clusters=7, n_init=3,random_state=42, batch_size=256, max_iter=50, max_no_improvement=30)

    # return {'SVC': svc, 'KNN': knn, 'NN': nn,}
    return {'KMeans': kmeans}

In [None]:
from sklearn.metrics import adjusted_rand_score, normalized_mutual_info_score
from sklearn.model_selection import train_test_split
from random import seed

scores = []
test_predictions = []
unique_models = 50
number_of_models = 0
seed(42)
rows_k = []
for i in range(unique_models):
    for synthetic_percentage in [0, 1]:
        classifiers = init_models()
        start_time = datetime.now()
        for index, (model_name, clf) in enumerate(classifiers.items()):
            test_dataset_shuffled = shuffle_test_dataset()

            train_dataset_with_generated_data, number_of_synthetic_samples = concatenate_fake_data(
                percentage=synthetic_percentage,
                generated_samples=generated_samples,
                x_train=x_train, y_train=y_train)
            print(
                f"{i}: starting model {model_name} with percentage: {synthetic_percentage} and {train_dataset_with_generated_data[0].shape[0]} samples")

            if model_name == 'NN':
                Y_train_encoded = tensorflow.one_hot(train_dataset_with_generated_data[1],
                                                     depth=train_dataset_with_generated_data[1].max() + 1)
                # Split the training dataset into training and validation sets
                x_train_tmp, x_val_tmp, y_train_tmp, y_val_tmp = train_test_split(train_dataset_with_generated_data[0],
                                                                                  np.array(Y_train_encoded),
                                                                                  stratify=np.array(Y_train_encoded),
                                                                                  random_state=42,
                                                                                  test_size=0.2)

                # Define early stopping
                early_stopping = EarlyStopping(patience=10, restore_best_weights=True, monitor='val_accuracy',
                                               verbose=2)

                clf.fit(x_train_tmp, y_train_tmp, batch_size=512, epochs=100, verbose=2,
                        validation_data=(x_val_tmp, y_val_tmp), callbacks=[early_stopping])

                # Make predictions on the test dataset
                test_predictions = tensorflow.argmax(clf.predict(test_dataset_shuffled[0]), axis=1)
            elif model_name == 'KMeans':
                if synthetic_percentage == 0:
                    clf.fit(train_dataset_with_generated_data[0])
                else:
                    clf.fit(generated_samples[0])
                # y_kmeans = clf.labels_
                # ari_model1 = adjusted_rand_score(train_dataset_with_generated_data[1], y_kmeans)
                # nmi_model1 = normalized_mutual_info_score(train_dataset_with_generated_data[1], y_kmeans)
                test_predictions = clf.fit_predict(test_dataset_shuffled[0])
                ari_model1 = adjusted_rand_score(test_dataset_shuffled[1], test_predictions)
                nmi_model1 = normalized_mutual_info_score(test_dataset_shuffled[1], test_predictions)
                rows_k.append({'synthetic_percentage': synthetic_percentage, 'ari_model': round(ari_model1, 5), 'nmi_model': round(nmi_model1, 5)})
                print(f"{synthetic_percentage}: {round(ari_model1, 5)}, {round(nmi_model1, 5)}")
            else:
                clf.fit(train_dataset_with_generated_data[0], train_dataset_with_generated_data[1])
                test_predictions = clf.predict(test_dataset_shuffled[0])
                print(test_predictions)

            class_report = classification_report(test_dataset_shuffled[1], test_predictions, output_dict=True)

            class_accuracy = {}
            for class_label, metrics in class_report.items():
                if class_label in ['accuracy', 'macro avg', 'weighted avg']:
                    continue
                class_accuracy[class_label] = round(calculate_accuracy(metrics['precision'], metrics['recall']), 4)

            # Print accuracy by class
            output_row = {}
            for class_label, accuracy in class_accuracy.items():
                output_row[id_to_class[int(class_label)]] = accuracy
                # print(f"Class {id_to_class[int(class_label)]}: {accuracy * 100:.2f}%")

            # print(
            #     f"=======> {model_name} {synthetic_percentage}: f1_macro: {class_report['weighted avg']['f1-score']}, f1_weighted, {class_report['macro avg']['f1-score']}, accuracy score: {class_report['accuracy']} on synthetic_percentage: ")

            output_row.update({"synthetic_percentage": synthetic_percentage,
                               "samples_and_percentage": f"{number_of_synthetic_samples}\n{int(synthetic_percentage * 100)}%",
                               "model_name": model_name, "accuracy": class_report['accuracy'],
                               "f1_score": class_report['macro avg']['f1-score']})
            rows.append(output_row)
            number_of_models += 1
            if number_of_models % 10 == 0:
                print(f"finished train {number_of_models} models")
                pd.DataFrame(rows).to_csv(os.path.join("classifier_analysis", output_file))
    end_time = datetime.now()
    duration_minutes = (end_time - start_time).total_seconds() / 60

    print(f"finished model iteration in {duration_minutes} minutes")

pd.DataFrame(rows).to_csv(os.path.join("classifier_analysis", output_file))

In [None]:
# !pip install xgboost==2.0.0

In [None]:
df_k = pd.DataFrame(rows_k)

In [None]:
print("******* median *******")
print(df_k.groupby('synthetic_percentage')["ari_model"].median())

print("\n******* mean *******")
print(df_k.groupby('synthetic_percentage')["nmi_model"].median())
