<a href="https://colab.research.google.com/github/ariaberlian/rbm_sr/blob/main/rbm_sr.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Python v3.6.12

##### Run This to Install #####

# ! pip install requirements.txt

## !git clone https://github.com/albertbup/deep-belief-network.git
## !pip install -r "deep-belief-network/requirements.txt"
## !mv "deep-belief-network" "deep_belief_network"

## """
## - add this to dbn/tensorflow/models.py:
##     import tensorflow._api.v2.compat.v1 as tf
##     tf.disable_v2_behavior()
## """

In [None]:
# from deep_belief_network.dbn.tensorflow.models import UnsupervisedDBN # use "from dbn.tensorflow import SupervisedDBNClassification" for computations on TensorFlow
from deep_belief_network.dbn.tensorflow.models import SupervisedDBNRegression
from utils.data_processing import DataProcessing
from utils.image_file_util import *
from utils.scoring import *
from utils.visualizer import *
import pandas as pd
from tqdm import tqdm
import gc
import multiprocessing
import time
import os

In [None]:

!nvidia-smi

## DBN

In [None]:
def test(i, row, model_name, model, interpolation_factor, patch_size, stride,data_train_size, dp: DataProcessing):

    test_var = row[f"test{i}"]
    test_ref_var = row[f"test{i}_ref"]
    test_image = load_image(f"test/{test_var}")
    test_reference_image = load_image(f"test/{test_ref_var}")
    model = model.load(f"model/{model_name}.h5")
    
    print(f"Testing model: {model_name}...")
    interpolated_test = dp.interpolate(test_image, interpolation_factor)

    test_patches = dp.get_patches(interpolated_test, patch_size, stride)
    norm, int_test_dct_ex = dp.normalize_for_rbm(test_patches[:test_patches.shape[0]])
    if data_train_size < test_patches.shape[0]:
        norm = dp.normalize_for_rbm(test_patches[:data_train_size])

    test_patches_flat = dp.preprocess_for_rbm(norm)
    norm=None

    result_flat = model.predict(test_patches_flat)
    result_flat = dp.proccess_output(test_patches_flat, result_flat, interpolation_factor)

    result_patches, recons_dct_ex = dp.inverse_preprocess(
        result_flat, (patch_size[0], patch_size[1], 3)
    )

    reconstruct_image = dp.reconstruct_from_patches(
        result_patches, original_shape=test_reference_image.shape, patch_size=patch_size, stride=stride)
    
    visualize4image(test_image, "Test Image", interpolated_test, "Interpolated Test Image",
                     reconstruct_image, "Reconstructed Image", test_reference_image, "Reference Image")
    
    visualize4Histogram(test_image, "Test Image", interpolated_test, "Interpolated Test Image",
                     reconstruct_image, "Reconstructed Image", test_reference_image, "Reference Image")
    
    visualize_dct(int_test_dct_ex, "Interpolated Test")
    visualize_dct(recons_dct_ex, "Reconstructed Image")

    norm, refs_dct_ex = dp.normalize_for_rbm(dp.get_patches(test_reference_image,patch_size,stride))
    visualize_dct(refs_dct_ex, "Reference Image")

    refs_flat = dp.preprocess_for_rbm(norm)

    psnr_baseline = calculate_psnr(test_reference_image, interpolated_test)
    ssim_baseline = calculate_ssim(test_reference_image, interpolated_test)*100
    rmse_baseline = calculate_rmse(refs_flat, test_patches_flat)
    psnr = calculate_psnr(test_reference_image, reconstruct_image)
    ssim = calculate_ssim(test_reference_image, reconstruct_image)*100
    rmse = calculate_rmse(refs_flat,result_flat)

    print(f"PSNR {i} Interpolated: {psnr_baseline:,.3f} dB")
    print(f"SSIM {i} Interpolated: {ssim_baseline:,.3f} %")
    print(f"RMSE {i} Interpolated {rmse_baseline}")
    print(f"PSNR {i}: {psnr:,.3f} dB")
    print(f"SSIM {i}: {ssim:,.3f} %")
    print(f"RMSE {i}: {rmse}")
    
    # Cleanup: explicitly delete all variables
    del test_image, test_reference_image, interpolated_test, test_patches, test_patches_flat
    del result_flat, result_patches, reconstruct_image, refs_flat, norm
    gc.collect()

    return psnr_baseline,ssim_baseline,psnr,ssim,rmse



In [None]:
# Load file Excel
excel_name = "Experimentation.xlsx"
exp_df = pd.read_excel(excel_name, engine="openpyxl")

def main(idx, row):
    try:
        # Baca parameter dari Excel untuk baris ini
        train = row["train"]
        fine_image = row["fine_tuning"]
        fine_label = row["label"]

        train_resolution = row['train_res']

        interpolation_factor = row['factor']
        patch_size = (row['patch_size'], row['patch_size'])
        stride = (row['stride'], row['stride'])
        data_train_size = row["data_train_size"]

        lr = 0.001
        epoch = 500
        epoch_fine = 100
        layers = [int(layer) for layer in str(row['layers']).split(",")]
        batch_size = row['batch_size']
        activation_function = 'sigmoid'

        model_name = f"model_{train}_{train_resolution}_x{interpolation_factor}_p{patch_size[0]}_s{stride[0]}_l{layers}"
        print(f"\nTraining model: {model_name}")

        #### Load Data and Preprocess ####
        dp = DataProcessing()

        training_image = load_image(f"train/{train}")
        # visualize_image(training_image, "Pre-Training Image")
        train_patches = dp.get_patches(training_image, patch_size=patch_size, stride=stride)
        norm, _ = dp.normalize_for_rbm(train_patches[:train_patches.shape[0]])
        if data_train_size < train_patches.shape[0]:
            norm = dp.normalize_for_rbm(train_patches[:data_train_size])

        X_pretrain = dp.preprocess_for_rbm(norm)

        del train_patches, training_image, norm
        gc.collect()

        fine_image = dp.interpolate(load_image(f'train/{fine_image}'), 2)
        label = load_image(f'train/{fine_label}')

        visualize_histogram_compare(fine_image, label, "Interpolated Fine Tuning Image", "Label Image")

        fine_patches = dp.get_patches(fine_image, patch_size=patch_size, stride=stride)
        norm, _ = dp.normalize_for_rbm(fine_patches[:fine_patches.shape[0]])
        if data_train_size < fine_patches.shape[0]:
            norm = dp.normalize_for_rbm(fine_patches[:data_train_size])
        X = dp.preprocess_for_rbm(norm)

        del fine_patches, fine_image, norm
        gc.collect()

        label_patches = dp.get_patches(label, patch_size=patch_size, stride=stride)
        norm, _ = dp.normalize_for_rbm(label_patches[:label_patches.shape[0]])
        if data_train_size < label_patches.shape[0]:
            norm = dp.normalize_for_rbm(label_patches[:data_train_size])
        y = dp.preprocess_for_rbm(norm)

        del label, label_patches, norm
        gc.collect()

        # #### Model Configuration ####
        dbn = SupervisedDBNRegression(
                    hidden_layers_structure=layers,
                    batch_size=batch_size,
                    learning_rate_rbm=lr,
                    n_epochs_rbm=epoch,
                    activation_function=activation_function,
                    optimization_algorithm='sgd',
                    learning_rate=lr,
                    n_iter_backprop=epoch_fine,
        )

        #### Train the Model ####
        if not(os.path.exists(f"model/{model_name}.h5")):
            print(f"Starting training for model: {model_name}")
            start_time = time.time()
            dbn.fit(X_pretrain,X,y)
            end_time = time.time()

            print("Training time: ", (end_time-start_time))
            dbn.save(f"model/{model_name}.h5")

            print(f"Model has been saved: {model_name}")
            exp_df.at[idx, "training_time"] = (end_time-start_time)

            del X_pretrain, X, y
            gc.collect()
        
        else:
            print("Model alrady exist!")

        #### Testing and Saving Results ####
        for i in range(1, 6):
            psnr_baseline, ssim_baseline, psnr, ssim,rmse = test(i, row, model_name, dbn, interpolation_factor, patch_size,stride,data_train_size, dp)
            # Save results to DataFrame
            exp_df.at[idx, f"b_psnr{i}"] = psnr_baseline
            exp_df.at[idx, f"b_ssim{i}"] = ssim_baseline
            exp_df.at[idx, f"psnr{i}"] = psnr
            exp_df.at[idx, f"ssim{i}"] = ssim
            exp_df.at[idx, f"rmse{i}"] = rmse
            exp_df.to_excel(excel_name, index=False)

        print(f"Finished processing row {idx+1}/{len(exp_df)} with model {model_name}")

    except Exception as e:
        print(f"Error processing row {idx+1}: {e}")
    
    finally:
        # Ensure all variables are deleted in the end
        del train, fine_label, train_resolution, interpolation_factor
        del patch_size, stride, data_train_size, lr, epoch, layers, batch_size, activation_function, model_name
        gc.collect()


specific_index = 1

# Process the specific row
if specific_index in exp_df.index:
    row = exp_df.loc[specific_index]
    main(specific_index, row) 
else:
    print(f"Index {specific_index} not found in the DataFrame.")


# for idx, row in tqdm(exp_df.iterrows(), total=len(exp_df), desc="Processing Rows"):
    # main(idx,row)


# print("All rows processed and results saved.")

In [None]:
# ## Training DBN
# def train(model, input : np.ndarray, model_name : str, repetitions: int, beta:float, interpolation_factor:float):
#     dp = DataProcessing()

#     visualize_histogram(input, title=f"Training Image")

#     curr_rep = 1
#     print("Repetisi saat ini: ", curr_rep)

#     model.fit(input)
#     model.save(f"model/{model_name}_{curr_rep}.h5")

#     while curr_rep < repetitions:
#         model = model.load(f"model/{model_name}_{curr_rep}.h5")
#         r = model.transform(input)
#         print("Shape Transformed: ", r.shape)
#         input = dp.proccess_output(u=input, r=r, beta=beta, s=interpolation_factor)
#         visualize_histogram(input, title=f"Training Image: After transform ({curr_rep})")
#         r = None

#         curr_rep += 1
#         print("Repetisi saat ini: ", curr_rep )

#         model.fit(input)
#         model.save(f"model/{model_name}_{curr_rep}.h5")

# ## Test DBN
# def test(test_image, test_reference_image, model, patch_size:tuple, stride:tuple, interpolation_factor:int):
#     dp = DataProcessing()

#     desired_shape = test_reference_image.shape
    
#     test_image = dp.interpolate(test_image, interpolation_factor=interpolation_factor)
    
#     visualize_image(test_image, title="Test Image after interpolation")
#     visualize_histogram(test_image, title="Test Image after interpolation", range=(0,256))
#     psnr_baseline_value = calculate_psnr(test_reference_image, test_image)
#     ssim_baseline_value = calculate_ssim(test_reference_image, test_image)*100

#     psnr_print = f"PSNR Baseline value: {psnr_baseline_value:,.3f} dB"
#     ssim_print = f"SSIM Baseline value: {ssim_baseline_value:,.3f}%"
#     print(psnr_print.replace(".", ","))
#     print(ssim_print.replace(".", ","))

#     test_patches = dp.get_patches(test_image,patch_size=patch_size, stride=stride)

#     # Process data for rbm
#     test_patches = dp.preprocess_for_rbm(test_patches)

#     # Infer test to model
#     result = model.transform(test_patches)

#     test_patches = dp.inverse_preprocess(
#         dp.proccess_output(test_patches, result, 1, interpolation_factor),
#         original_patch_shape=(patch_size[0], patch_size[1], 3)
#         )
    
#     result = None

#     # visualize_patches(test_patches, title="Test Patches Example", visualize_size=(6,6))

#     reconstruct_image = dp.reconstruct_from_patches(test_patches, original_shape=desired_shape, patch_size=patch_size, stride=stride)

#     test_patches = None

#     visualize_histogram_compare(original_image=test_reference_image, reconstruct_image=reconstruct_image)
#     psnr_value = calculate_psnr(test_reference_image, reconstruct_image)
#     ssim_value = calculate_ssim(test_reference_image, reconstruct_image)*100

#     psnr_print = f"PSNR value: {psnr_value:,.3f} dB"
#     ssim_print = f"SSIM value: {ssim_value:,.3f}%"

#     print(psnr_print.replace(".", ","))
#     print(ssim_print.replace(".", ","))


In [None]:
# tracemalloc.start()

# #### Patch size and Stride ####
# patch_size = (16,16)
# stride = (4,4)

# #### Load Data ####
# train_resolution = 512
# test_resolution = 256

# dp = DataProcessing()

# # Load Pre Training Data
# training_image = load_image("train/peppers.png")
# visualize_image(training_image, "Training Image")

# train_patches = dp.get_patches(training_image, patch_size=patch_size, stride=stride)

# # Feel free to reduce training set
# training_set_num = train_patches.shape[0]

# # visualize_patches(train_patches, "Train patches example")


# X_train = dp.preprocess_for_rbm(train_patches[:training_set_num])

# train_patches = None
# training_image = None

# # Load Fine Tuning Data
# fine_image = load_image("test/lenna_256.png")
# label_fine = load_image("train/lenna.png")
# fine_image = dp.interpolate(fine_image, 2)

# fine_patches = dp.get_patches(fine_image, patch_size, stride)
# label_patches = dp.get_patches(label_fine, patch_size, stride)

# fine_train = dp.preprocess_for_rbm(fine_patches)
# label_train = dp.preprocess_for_rbm(label_patches)



# #### Training parameter ###
# interpolation_factor = 2
# beta = 1
# Repetitions = 1
# lr = 0.01
# epoch = 100

# layers = [400,200,768]
# batch_size = 1024
# activation_function = 'relu'

# model_name = f"model_peppers_{train_resolution}_x{interpolation_factor}_p{patch_size[0]}_s{stride[0]}_({layers[0]}_{layers[1]}_{layers[2]})"

# # Models we will use
# # dbn = UnsupervisedDBN(hidden_layers_structure=layers,
# #                       batch_size=batch_size,
# #                       learning_rate_rbm=lr,
# #                       n_epochs_rbm=epoch,
# #                       activation_function=activation_function,
# #                       optimization_algorithm='sgd',)

# dbn = SupervisedDBNRegression(
#                     hidden_layers_structure=layers,
#                       batch_size=batch_size,
#                       learning_rate_rbm=lr,
#                       n_epochs_rbm=epoch,
#                       activation_function=activation_function,
#                       optimization_algorithm='sgd',
#                                     learning_rate=lr,
#                                     n_iter_backprop=200,

# )

# dbn.fit(X_train, fine_train, label_train)
# dbn.save("model/hmmm.h5")
# #### Train and Test ####

# snapshot1 = tracemalloc.take_snapshot()

# # comment this to skip training
# # train(
# #     model=dbn, 
# #     input=X_train,
# #     model_name=model_name,
# #     beta=beta,
# #     interpolation_factor=interpolation_factor,
# #     repetitions=Repetitions, 
# # )

# snapshot2 = tracemalloc.take_snapshot()
# top_stats = snapshot2.compare_to(snapshot1, 'lineno')

# print("[ Top 10 differences in memory allocation ]")
# for stat in top_stats[:10]:
#     print(stat)

# # Load Testing Data
# print("== Test 1 ==")
# test_image = load_image(f"test/peppers_256.png")
# visualize_image(test_image, title="Test Image")

# test_reference_image = load_image(f"train/peppers.png")
# dbn = dbn.load(f"model/hmmm.h5")

# test(
#     test_image=test_image,
#     test_reference_image=test_reference_image,
#     model=dbn,
#     patch_size=patch_size,
#     stride=stride,
#     interpolation_factor=interpolation_factor
# )



# print("== Test 2 ==")
# test_image = load_image(f"test/lenna_256.png")
# visualize_image(test_image, title="Test Image")

# test_reference_image = load_image(f"train/lenna.png")
# dbn = dbn.load(f"model/{model_name}_{Repetitions}.h5")

# test(
#     test_image=test_image,
#     test_reference_image=test_reference_image,
#     model=dbn,
#     patch_size=patch_size,
#     stride=stride,
#     interpolation_factor=interpolation_factor
# )

# print("== Test 3 ==")
# test_image = load_image(f"test/{test_resolution}/ct_lung4_{test_resolution}.png")
# visualize_image(test_image, title="Test Image")

# test_reference_image = load_image(f"test/{train_resolution}/ct_lung4_{train_resolution}.png")
# dbn = dbn.load(f"model/{model_name}_{Repetitions}.h5")

# test(
#     test_image=test_image,
#     test_reference_image=test_reference_image,
#     model=dbn,
#     patch_size=patch_size,
#     stride=stride,
#     interpolation_factor=interpolation_factor
# )