In [None]:
import numpy as np
import pandas as pd
import random
import tensorflow as tf
import tensorflow_probability as tfp
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import tensorflow as tf
from keras import layers
import matplotlib.pyplot as plt
import seaborn as sns
import math


# TensorFlow and TensorFlow Probability shortcuts
tfd = tfp.distributions
tfpl = tfp.layers
tfk = tf.keras
tfkl = tf.keras.layers

# Functions to define prior and posterior for Bayesian layers
def posterior_mean_field(kernel_size, bias_size=0, dtype=None):
    n = kernel_size + bias_size
    c = np.log(np.expm1(1.))
    return tf.keras.Sequential([
        tfp.layers.VariableLayer(2 * n, dtype=dtype),
        tfp.layers.DistributionLambda(lambda t: tfd.Independent(
            tfd.Normal(loc=t[..., :n],
                       scale=1e-5 + tf.nn.softplus(c + t[..., n:])),
            reinterpreted_batch_ndims=1)),
    ])

def prior_trainable(kernel_size, bias_size=0, dtype=None):
    n = kernel_size + bias_size
    return tf.keras.Sequential([
        tfp.layers.VariableLayer(n, dtype=dtype),
        tfp.layers.DistributionLambda(lambda t: tfd.Independent(
            tfd.Normal(loc=t, scale=1),
            reinterpreted_batch_ndims=1)),
    ])

# Example for a single input model. Adjust as needed.
#def create_model_inputs():
    #return {'input': tf.keras.Input(shape=(X_scaled.shape[1],), name='input')}

# List of integers indicating the size of each hidden layer
hidden_units = [16,8]
learning_rate = 0.001

def create_probabilistic_bnn_model(train_size):
    inputs = create_model_inputs()
    features = tfkl.Concatenate(axis=-1)(list(inputs.values()))
    features = tfkl.BatchNormalization()(features)

    # Replace `prior` and `posterior` with `prior_trainable` and `posterior_mean_field`
    for units in hidden_units:
        features = tfpl.DenseVariational(
            units=units,
            make_prior_fn=prior_trainable,
            make_posterior_fn=posterior_mean_field,
            kl_weight=1/train_size,
            activation='softmax')(features)

    # Create a probabilistic output (Normal distribution),
    # and use the `Dense` layer to produce the parameters of the distribution.
    distribution_params = tfkl.Dense(units=2)(features)
    outputs = tfpl.IndependentNormal(1)(distribution_params)

    model = tf.keras.Model(inputs=inputs, outputs=outputs)
    return model


In [None]:
def load_the_data(path):
   
    data = pd.read_csv(path)
    
    # Extract the label
    label = data['class']
    
    # Drop specified columns and standardize the data
    data = data.drop(['E', 'N', 'class'], axis=1)
    data_columns= data.columns
    data = StandardScaler().fit_transform(data)
    
    # Convert label to NumPy array (if not already)
    label = label.to_numpy()
    
    return data, label, data_columns

In [None]:
#loading the data
data_class_1, label_class_1, data_columns_1 = load_the_data(path=f"/home/dipak/Desktop/codes/MLP_all_codes/MLP_Jukka_07_03_23/17_annoted_points.csv")
data_class_0, label_class_0, data_columns_0 = load_the_data(path=f"/home/dipak/Desktop/codes/MLP_all_codes/MLP_Jukka_07_03_23/2M_raster_points.csv")

print(f'[CLASS 1] {data_class_1.shape}')
print(f'[CLASS 0] {data_class_0.shape}')

print(len(data_class_1))

In [None]:
num_rows = 34
print(num_rows)
FEATURE_NAMES = data_columns_1
print(FEATURE_NAMES)

In [None]:
def create_model_inputs():
    inputs = {}
    for feature_name in FEATURE_NAMES:
        inputs[feature_name] = layers.Input(
            name=feature_name, shape=(1,), dtype=tf.float64
        )
    return inputs

inputs = create_model_inputs()

dataset_size = num_rows
batch_size = 2
train_size = int(dataset_size)


def dataframe_to_dataset(data_class, label_class, batch_size):
    dataframe = data_class.copy()
    labels = label_class
    ds = tf.data.Dataset.from_tensor_slices((dict(dataframe), labels))
    ds = ds.batch(batch_size)
    return ds

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from keras import layers
import tensorflow_datasets as tfds
import tensorflow_probability as tfp
from sklearn.preprocessing import StandardScaler

num_epochs = 1000

def run_experiment(model, loss, train_dataset, test_dataset):

     model.compile(optimizer=tf.optimizers.Adam(learning_rate=0.01),
                   loss=loss,
                   metrics=[keras.metrics.RootMeanSquaredError()]
                   )
    
     print("Start training the model...")
    # Train model
     model.fit(train_dataset, epochs= num_epochs, validation_data=test_dataset, verbose=1)

def negative_loglikelihood(targets, estimated_distribution):
    return -estimated_distribution.log_prob(targets)



In [None]:
import pandas as pd

def generate_and_save_predictions(prob_bnn_model, test_dataset, round_number):
    """
    Generate predictions using the Bayesian Neural Network model and save them as a CSV file.

    Parameters:
    - prob_bnn_model: The probabilistic BNN model.
    - test_dataset: The test dataset.
    - round_number: The current round number, used for naming the output file.

    Returns:
    - targets: The actual target values.
    - prediction_mean: The predicted mean values.
    - prediction_stdv: The standard deviation of the predicted values.
    """
    sample = test_dataset.cardinality().numpy() * batch_size  # Replace with actual batch size if this doesn't work
    examples, targets = list(test_dataset.unbatch().shuffle(batch_size * 10).batch(sample))[0]
    prediction_distribution = prob_bnn_model(examples)
    
    prediction_mean = prediction_distribution.mean().numpy().tolist()
    prediction_stdv = prediction_distribution.stddev().numpy()
    
    # The 95% CI is computed as mean ± (1.96 * stdv)
    upper = (prediction_mean + (1.96 * prediction_stdv)).tolist()
    lower = (prediction_mean - (1.96 * prediction_stdv)).tolist()
    prediction_stdv = prediction_stdv.tolist()
    
    results = []
    for idx in range(sample):
        results.append({
            "Prediction mean": round(prediction_mean[idx][0], 2),
            "stddev": round(prediction_stdv[idx][0], 2),
            "95% CI lower": round(lower[idx][0], 2),
            "95% CI upper": round(upper[idx][0], 2),
            "Actual": targets[idx].numpy()  # Adjust if the tensor is not an eager tensor
        })
        
        print(
            f"Prediction mean: {round(prediction_mean[idx][0], 2)}, "
            f"stddev: {round(prediction_stdv[idx][0], 2)}, "
            f"95% CI: [{round(upper[idx][0], 2)} - {round(lower[idx][0], 2)}]"
            f" - Actual: {targets[idx]}"
        )
    
    results_df = pd.DataFrame(results)
    results_df.to_csv(f'predictions_round_{round_number}.csv', index=False)
    
    return targets, prediction_mean, prediction_stdv  # Added return statement


In [None]:
import matplotlib.pyplot as plt

def plot_and_save_CI(targets, prediction_mean, prediction_stdv, round_number):
    """
    Plot the confidence interval and save it as an image file.

    Parameters:
    - targets: Actual target values.
    - prediction_mean: Predicted mean values.
    - prediction_stdv: Standard deviation of the predicted values.
    - round_number: The current round number, used for naming the output file.

    Returns:
    None
    """
    # Assuming prediction_mean and prediction_stdv are nested lists like [[a], [b], [c], ...]
    flat_mean = [x[0] for x in prediction_mean]
    flat_std = [x[0] for x in prediction_stdv]

    # Create a new figure and set the size of the figure
    plt.figure(figsize=(10, 6))

    # Plot actual vs. predicted values
    plt.scatter(range(len(targets)), targets, label='Actual', marker='o', color='red')
    plt.plot(flat_mean, label='Predicted', marker='x')

    # Fill between for 95% CI
    plt.fill_between(
        range(len(flat_mean)),
        [pred - 1.96*std for pred, std in zip(flat_mean, flat_std)],
        [pred + 1.96*std for pred, std in zip(flat_mean, flat_std)],
        alpha=0.2,
        label='95% CI'
    )

    # Labeling the axes and providing a title
    plt.xlabel('Example')
    plt.ylabel('Value')
    plt.title('Predicted vs Actual Values with 95% Confidence Interval')
    plt.legend(loc='upper left', bbox_to_anchor=(1, 1))

    # Save the plot as a .png file
    plt.savefig(f'confidence_interval_plot_round_{round_number}.png', bbox_inches='tight')

    # Display the plot
    plt.show()


In [None]:
def dataframe_to_dataset1(data, labels, batch_size, data_columns_1):
    """
    Convert the data and labels into a tf.data.Dataset.
    
    Parameters:
        - data: DataFrame or array-like object of shape (n_samples, n_features)
        - labels: Array-like object of shape (n_samples,)
        - batch_size: Integer, size of batches to divide data into.
    
    Returns:
        - ds: tf.data.Dataset object
    """
    if isinstance(data, pd.DataFrame):
        data_dict = {feature: data[feature].values for feature in data.columns}
    else:  # assuming it's an array-like object
        # Ensure you have a list of feature names when data is not a DataFrame
        feature_names = data_columns_1  # Replace with actual feature names
        data_dict = {feature: data[:, idx] for idx, feature in enumerate(feature_names)}

    ds = tf.data.Dataset.from_tensor_slices((data_dict, labels))
    ds = ds.batch(batch_size)
    return ds


In [None]:
import matplotlib.pyplot as plt

def scatter_plot_predictions_and_save(targets, prediction_mean, prediction_stdv, round_number):
    """
    Generates a scatter plot of actual vs. predicted values with confidence as color,
    and saves the plot as an image file.

    Parameters:
    - targets: Actual target values.
    - prediction_mean: Predicted mean values from the model.
    - prediction_stdv: Predicted standard deviation values from the model.
    - round_number: The current round number, used for naming the output image file.

    Returns:
    None
    """
    # Assuming that prediction_mean and prediction_stdv are nested lists like [[a], [b], [c], ...]
    flat_mean = [x[0] for x in prediction_mean]
    flat_std = [x[0] for x in prediction_stdv]

    # Creating a scatter plot
    plt.scatter(x=targets, y=flat_mean, c=flat_std, cmap='viridis', s=50, alpha=0.6)
    plt.plot([min(targets), max(targets)], [min(targets), max(targets)], 'k--')
    plt.title('Predicted vs. Actual with Confidence as Color')
    plt.xlabel('Actual Value')
    plt.ylabel('Predicted Value')
    
    # Saving the plot as an image file
    plt.savefig(f'predicted_vs_actual_round_{round_number}.png')
    
    # Displaying the plot
    plt.show()

# Example usage:
# plot_predictions_and_save(targets, prediction_mean, prediction_stdv, round_number)


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix

def plot_and_save_confusion_matrix(prediction_mean, targets, round_number):
    """
    Plot the confusion matrix using predictions and actual targets and save the plot.

    Parameters:
    - prediction_mean: Mean of the predictions.
    - targets: Actual target values.
    - round_number: Current round number, used for naming the output file.

    Returns:
    None
    """
    # Discretizing the predictions
    prediction_discrete = np.round(prediction_mean)

    # Ensure your targets are also in a comparable shape/format
    true_labels = [label.numpy() for label in targets]  # Adapt as per your use case
    
    # Compute confusion matrix
    cm = confusion_matrix(true_labels, prediction_discrete)
    
    plt.figure(figsize=(10, 7))

    # Use seaborn to make the heatmap
    sns.heatmap(cm, annot=True, fmt='g', cmap='Blues')
    
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.title('Confusion Matrix')
    
    # Save the plot as a .png file
    plt.savefig(f'confusion_matrix_round_{round_number}.png', bbox_inches='tight')
    
    # Display the plot
    plt.show()


In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

def plot_ard(prob_bnn_model, df, round_number):
    """
    Plot Automatic Relevance Determination (ARD) and save it according to the round number.

    Parameters:
    - prob_bnn_model: The probabilistic BNN model.
    - df: DataFrame used for extracting feature names.
    - round_number: Current round number, used for naming the output file.

    Returns:
    None
    """
    # Extract trainable variables
    trainable_variables = prob_bnn_model.layers[-3].trainable_variables

    # Calculate input_dim
    input_dim = df.shape[1] - 1  # Assuming the last column is the target variable

    # Extract means and raw scales from trainable variables
    posterior_means = trainable_variables[0][:input_dim].numpy()
    raw_scale = trainable_variables[0][input_dim:2 * input_dim].numpy()

    # Convert raw scales to standard deviations
    posterior_stddevs = tf.nn.softplus(raw_scale).numpy()

    # Compute importance
    importance = np.abs(posterior_means) * posterior_stddevs

    # Sort indices of importance
    sorted_idx = np.argsort(importance)[::-1]

    # Display the sorted feature importances
    print("Features sorted by their importance:")
    for idx in sorted_idx:
        print(f"{df.columns[idx]}: {importance[idx]}")

    # Prepare data for plotting
    features = [df.columns[idx] for idx in sorted_idx]
    importances = [importance[idx] for idx in sorted_idx]

    # Create horizontal bar plot
    plt.figure(figsize=(10,8))
    bars = plt.barh(features, importances, color='skyblue')
    plt.xlabel('Importance')
    plt.title('Feature Importance')

    # Add the data value at the end of each bar
    for bar, value in zip(bars, importances):
        plt.text(bar.get_width(), bar.get_y() + bar.get_height()/2, 
                 f'{value:.4f}', 
                 va='center', ha='left', color='blue', fontsize=10)

    # Invert y-axis to have the most important feature at the top
    plt.gca().invert_yaxis()
    
    # Save and show the plot
    plt.savefig(f'ARD_plot_round_{round_number}.png', bbox_inches='tight')
    plt.show()


In [None]:
NUMBER_OF_SAMPLE_TO_TAKE = len(data_class_1)

# number of total combination
number_of_total_combination = math.floor(data_class_0.shape[0] / NUMBER_OF_SAMPLE_TO_TAKE)
print(number_of_total_combination)
# I need this to safeget stuff from the memory 
list_of_already_taken_id = list()

for round_number in range(number_of_total_combination + 1):
    # get the random number
    class_0_we_need, label_0_we_need = list(), list()

    list_of_index = list()

    if len(list_of_already_taken_id) + NUMBER_OF_SAMPLE_TO_TAKE > len(data_class_0):
        print(f"[NO MORE SAMPLE]")

        for ii in range(len(list_of_already_taken_id), len(data_class_0)):
            list_of_index.append(ii)
        print(f'{len(list_of_index)} --- {list_of_index}')
    else:

        while len(list_of_index) < NUMBER_OF_SAMPLE_TO_TAKE:
            random_number = random.randint(0, len(data_class_0) - 1)

            # if the number is not in the list add to the random numbers
            if random_number not in list_of_already_taken_id:
                list_of_already_taken_id.append(random_number)
                #print(f"[AVAILLABLE INDEX] {random_number}")
                list_of_index.append(random_number)
            else:
                #print(f"[NOT AVAILLABLE]")
                continue

    for index in list_of_index:
        class_0_we_need.append(data_class_0[index, :])
        label_0_we_need.append(label_class_0[index])

    class_0_we_need = np.array(class_0_we_need)
    label_0_we_need = np.array(label_0_we_need)
    
    data = np.concatenate((data_class_1, class_0_we_need), axis=0)
    labels = np.concatenate((label_class_1, label_0_we_need), axis=0)

    FEATURE_NAMES = data_columns_1  # Ensure this list has the names of your features
    df = pd.DataFrame(data, columns=FEATURE_NAMES)  # Convert data to a DataFrame

    # Add labels to the DataFrame if they are not included in FEATURE_NAMES
    df['target'] = labels



    data = tf.cast(data, dtype=tf.float32)
    labels = tf.cast(labels, dtype=tf.float32)

    print(f"Data shape: {data.shape}, Labels shape: {labels.shape}")
    print(f"Data type: {type(data)}, Labels type: {type(labels)}")

    
    print(f"Concatenenated data is {data.shape} concateaned label is {labels.shape}")

    train_dataset = dataframe_to_dataset1(data, labels, batch_size=batch_size, data_columns_1=FEATURE_NAMES)
    test_dataset = dataframe_to_dataset1(data,labels, batch_size=batch_size, data_columns_1=FEATURE_NAMES)
    

    print(f"Concatenenated data is {data.shape} concateaned label is {labels.shape}")

    prob_bnn_model = create_probabilistic_bnn_model(train_size)
    run_experiment(prob_bnn_model, negative_loglikelihood, train_dataset, test_dataset)
    # Generate predictions and save them as a CSV file
    targets, prediction_mean, prediction_stdv = generate_and_save_predictions( prob_bnn_model, test_dataset, round_number)
    plot_and_save_CI(targets, prediction_mean, prediction_stdv, round_number)
    scatter_plot_predictions_and_save(targets, prediction_mean, prediction_stdv, round_number)
    plot_and_save_confusion_matrix(prediction_mean, targets, round_number)
    plot_ard(prob_bnn_model, df, round_number)

