In [1]:
import os
import gc
import cv2
import numpy as np
import tensorflow as tf
from keras.models import load_model
from keras import backend as K
from sklearn.model_selection import train_test_split
from IPython.display import clear_output
import random
import matplotlib.pyplot as plt
import import_ipynb
import srcnnModel
import vdsrcnnModel
import Utilities

importing Jupyter notebook from srcnnModel.ipynb
importing Jupyter notebook from vdsrcnnModel.ipynb
importing Jupyter notebook from Utilities.ipynb


In [2]:
def shuffleDataset(arr1, arr2):
    indices = np.random.permutation(len(arr1))
    shuffled_array1 = arr1[indices]
    shuffled_array2 = arr2[indices]
    return shuffled_array1, shuffled_array2

In [3]:
def processImage(image_input, size, normalizing, expand):
    """
    Process an image from a file path or a cv2 image (NumPy array).

    Returns:
    np.ndarray: The processed image.
    """
    if isinstance(image_input, str):
        image = cv2.imread(image_input)
        if image is None:
            raise ValueError(f"Could not load image from path: {image_input}")
        
    elif isinstance(image_input, np.ndarray):
        image = image_input
    else:
        raise TypeError("Input must be a file path (str) or a cv2 image (np.ndarray)")

    if size:
        width = size[0]
        height = size[1]
        image = cv2.resize(image, (width, height))

    if normalizing==1:
        image = image.astype(np.float32) / 255.0
        
    elif normalizing==-1:
        image = image.astype(np.float32) * 255.0
        image=image.astype(np.uint8)
    
    if expand==1:
        image = np.expand_dims(image, axis=0)

    return image

In [4]:
def psnr(original, compressed, maxx = 1.0):
    original = original.astype(np.float32)
    compressed = compressed.astype(np.float32)
    
    mse = np.mean((original - compressed) ** 2)
    if mse == 0:
        return float('inf')

    max_pixel = maxx
    psnr = 20 * np.log10(max_pixel / np.sqrt(mse))
    
    return psnr

In [5]:
def getData(size, index):
    dirName = "LoadedData\\LoadedData(" + str(size) + ")"
    hrName = dirName + "\\" + "HR"
    lr2Name = dirName + "\\" + "LR2"

    hr = os.listdir(hrName)
    lr2 = os.listdir(lr2Name)

    os.chdir(hrName)
    hrData = np.load(hr[index])
    os.chdir("A:\WORK\Projects\Machine Learning\Revive-AI")

    os.chdir(lr2Name)
    lr2Data = np.load(lr2[index])
    os.chdir("A:\WORK\Projects\Machine Learning\Revive-AI")


    return hrData, lr2Data

In [6]:
def validateModel(model, val_lr, val_hr, device):
    if device=="GPU":
        with tf.device("/GPU:0"):
            predictions = model.predict(val_lr)
    else:
        with tf.device("/CPU:0"):
            predictions = model.predict(val_lr)

    originalPSNR = []
    reconstructedPSNR = []

    for i in range(len(predictions)):
        predictedImage = predictions[i]
        inputImage = processImage(val_lr[i], (val_hr.shape[1], val_hr.shape[2]), 0, 0)
        originalImage = val_hr[i]
        psnrOriginal = psnr(originalImage, inputImage)
        psnrReconstructed = psnr(originalImage, predictedImage)
        originalPSNR.append(psnrOriginal)
        reconstructedPSNR.append(psnrReconstructed)

    print("ORIGINAL IMAGE\n")
    print("PSNR Range : ", min(originalPSNR), "     -     ", max(originalPSNR), "\n")
    print("Mean PSNR : ", sum(originalPSNR)/len(originalPSNR))

    print("\n")

    print("RECONSTRUCTED IMAGE\n")
    print("PSNR Range : ", min(reconstructedPSNR), "     -     ", max(reconstructedPSNR), "\n")
    print("Mean PSNR : ", sum(reconstructedPSNR)/len(reconstructedPSNR))

In [7]:
def setup_gpu(memory_limit=None):
    gpus = tf.config.experimental.list_physical_devices('GPU')
    if gpus:
        try:
            for gpu in gpus:
                tf.config.experimental.set_memory_growth(gpu, True)
            if memory_limit is not None:
                tf.config.experimental.set_virtual_device_configuration(
                    gpus[0],
                    [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=memory_limit)]
                )
        except RuntimeError as e:
            print(e)

In [8]:
def check_gpu_availability(min_memory_mb=2048):
    gpus = tf.config.experimental.list_physical_devices('GPU')
    if gpus:
        try:
            # Check memory details for each GPU
            for gpu in gpus:
                details = tf.config.experimental.get_device_details(gpu)
                memory_limit = details.get('memory_limit', 0)
                if memory_limit > min_memory_mb * 1024 * 1024:
                    return True
        except RuntimeError as e:
            print(e)
    return False

In [9]:
def trainer(iterState, dataBatch, epochs, b, device, modelType, modelPath, size):
    for batch in range(0, dataBatch):
        print(iterState)
        print("\n")
        print("Loading Batch ", batch+1, " ....")
        hr, lr2 = Utilities.getData(size, batch)
        hr, lr2= Utilities.shuffleDataset(hr, lr2)
        train_hr, val_hr, train_lr, val_lr = train_test_split(hr, lr2, test_size=0.2, random_state=42)
        print("Batch ", batch+1 , " Loaded\n")

        if os.path.exists(modelPath):
            print("Loading Model For Batch ", batch+1)
            if modelType=="vdsrcnn":
                model = load_model(modelPath, custom_objects={'psnr': vdsrcnnModel.psnr})
            else:
                model = load_model(modelPath, custom_objects={'psnr': srcnnModel.psnr})
            print("Model For Batch ", batch+1, " Successfully Loaded\n")
        else:
            print("Building Model For Batch ", batch+1)
            if modelType=="vdsrcnn":
                model = vdsrcnnModel.build_vdsrcnn_model_LR2_HR(18, (size, size, 3))
            else:
                model = srcnnModel.build_srcnn_model_LR2_HR((size, size, 3))
            print("Model For Batch ", batch+1, " Successfully Built\n")
        
        if modelType=="vdsrcnn":
            print("Training The Model On Batch ", batch+1)
            vdsrcnnModel.trainModel(model, train_lr, train_hr, val_lr, val_hr, epochs=epochs, batch=b, device=device)
            print("Model On Batch ", batch+1, " Trained Successfully\n")
        else:
            print("Training The Model On Batch ", batch+1)
            srcnnModel.trainModel(model, train_lr, train_hr, val_lr, val_hr, epochs=epochs, batch=b, device=device)
            print("Model On Batch ", batch+1, " Trained Successfully\n")

        model.save(modelPath)

        del hr, lr2, train_hr, train_lr, val_hr, val_lr, model

        gc.collect()

        clear_output(wait=True)

In [10]:
def trainModel(iterations, epochs, b, dataBatch, device, modelType, modelPath, size):
    for i in range(iterations):
        iterState = "Running Iteration " + str(i+1) + " On Dataset Of " + str(dataBatch) + " Batches"
        trainer(iterState, dataBatch, epochs, b, device, modelType, modelPath, size)

In [1]:
def evaluateModel(modelType, modelPath, batch, size):

    if modelType=="vdsrcnn":
        model = load_model(modelPath, custom_objects={'psnr': vdsrcnnModel.psnr})
    else:
        model = load_model(modelPath, custom_objects={'psnr': srcnnModel.psnr})

    hr, lr2 = Utilities.getData(size, batch)
    hr, lr2= Utilities.shuffleDataset(hr, lr2)
    train_hr, val_hr, train_lr, val_lr = train_test_split(hr, lr2, test_size=0.2, random_state=42)

    if Utilities.check_gpu_availability(8000):
        with tf.device("/GPU:0"):
            predictions = model.predict(val_lr)
    else:
        with tf.device("/CPU:0"):
            predictions = model.predict(val_lr)
            
    lowResPSNRs = []
    reconstructedPSNRs = []

    for index in range(len(predictions)):
        lowResImage = val_lr[index]
        reconstructedImage = predictions[index]
        highResImage = val_hr[index]

        psnrLowRes = psnr(lowResImage, highResImage)
        psnrReconstructed = psnr(reconstructedImage, highResImage)

        lowResPSNRs.append(psnrLowRes)
        reconstructedPSNRs.append(psnrReconstructed)

    return lowResPSNRs, reconstructedPSNRs


In [12]:
def evaluate(batches, modelType, modelPath, size):
    modelReport = {}
    for batch in range(0, batches):
        lowRes, reconstrucetd = Utilities.evaluateModel(modelType, modelPath, batch, size)
        modelReport[batch+1] = [lowRes, reconstrucetd]
    return modelReport

In [13]:
def analyzeModelReport(modelReport):
    batches = list(modelReport.keys())
    low = []
    rec = []
    for batch in batches:
        lowRes = modelReport[batch][0]
        reconstructed = modelReport[batch][1]
        lowPSNR = sum(lowRes)/len(lowRes)
        reconstructedPSNR = sum(reconstructed)/len(reconstructed)
        low.append(lowPSNR)
        rec.append(reconstructedPSNR)

        print("Results on batch ", batch)
        print("                    Low Res Image     Reconstructed Image")
        print("Mininum PSNR       ", "{:.2f}".format(min(lowRes)), "           ", "{:.2f}".format(min(reconstructed)))
        print("Maximum PSNR       ", "{:.2f}".format(max(lowRes)), "           ", "{:.2f}".format(max(reconstructed)))
        print("Average PSNR       ", "{:.2f}".format(sum(lowRes)/len(lowRes)), "           ", "{:.2f}".format(sum(reconstructed)/len(reconstructed)))
        print("\n")
    
    print("Average PSNR of Low-Res Images : ", sum(low)/len(low))
    print("Average PSNR of Reconstructed Images : ", sum(rec)/len(rec))
    


In [14]:
def inferOnLoadedData(modelType, modelPath, size, batches):

    if modelType=="vdsrcnn":
        model = load_model(modelPath, custom_objects={'psnr': vdsrcnnModel.psnr})
    else:
        model = load_model(modelPath, custom_objects={'psnr': srcnnModel.psnr})

    count = 0
    
    for batch in range(0, batches):
        hr, lr2 = Utilities.getData(size, batch)
        hr, lr2= Utilities.shuffleDataset(hr, lr2)
        train_hr, val_hr, train_lr, val_lr = train_test_split(hr, lr2, test_size=0.2, random_state=42)

        del hr, lr2, train_hr, train_lr

        index = random.randint(0, (len(val_hr)-1))

        highResImage = val_hr[index]
        lowResImage = val_lr[index]
        with tf.device("/GPU:0"):
                       reconstructedImage = model.predict(val_lr)[index]

        lowPSNR = Utilities.psnr(highResImage, lowResImage, 1.0)
        reconstructedPSNR = Utilities.psnr(highResImage, reconstructedImage, 1.0)

        fig, axes = plt.subplots(1, 3, figsize=(15, 5), sharey=True, gridspec_kw={'hspace': 0.8})

        if lowPSNR<reconstructedPSNR:
                count+=1
                fig.set_facecolor("lightgreen")
        else:
                fig.set_facecolor("pink")
        
        lowResImage = cv2.cvtColor(lowResImage, cv2.COLOR_BGR2RGB)
        highResImage = cv2.cvtColor(highResImage, cv2.COLOR_BGR2RGB)
        reconstructedImage = cv2.cvtColor(reconstructedImage, cv2.COLOR_BGR2RGB)
        
        print("Batch ", str(batch+1), " Of ", str(batches), " Batches")

        axes[0].imshow(lowResImage)
        axes[0].set_title('Low-Res Image', fontsize=15)
        axes[0].set_xlabel('PSNR : ' + str(round(lowPSNR, 2)), fontsize=15)

        axes[1].imshow(reconstructedImage)
        axes[1].set_title('Reconstructed Image', fontsize=15)
        axes[1].set_xlabel('PSNR : ' + str(round(reconstructedPSNR, 2)), fontsize=15)

        axes[2].imshow(highResImage)
        axes[2].set_title('High-Res Image', fontsize=15)
        axes[2].set_xlabel('PSNR : INF', fontsize=15)
    
        plt.show()

    accuracy = round((count/batches)*100, 2)
    print("The Model Improved Image Resolution Of Random Images From ", count, " Batches Out Of The ", batches, " Batches.\n")
    print("Model's Accuracy : ", str(accuracy) + "%")

    return count

In [1]:
def inferOnRawData(modelType, modelPath, size, samples, highResPath, lowResPath):

    if modelType=="vdsrcnn":
        model = load_model(modelPath, custom_objects={'psnr': vdsrcnnModel.psnr})
    else:
        model = load_model(modelPath, custom_objects={'psnr': srcnnModel.psnr})

    count = 0

    highResImagePaths = os.listdir(highResPath)
    lowResImagePaths = os.listdir(lowResPath)
    
    for sample in range(0, samples):
        index = random.randint(0, (len(highResImagePaths)-1))

        highResImagePath = highResPath + "\\" + highResImagePaths[index]
        lowResImagePath = lowResPath + "\\" + lowResImagePaths[index]

        highResImage = Utilities.processImage(highResImagePath, None, 1, 0)
        width = highResImage.shape[1]
        height = highResImage.shape[0]
        lowResImage = Utilities.processImage(lowResImagePath, (width, height), 1, 0)

        inputImage = Utilities.processImage(lowResImagePath, (size, size), 1, 1)
        with tf.device("/GPU:0"):
                       reconstructedImage = model.predict(inputImage)[0]
        
        reconstructedImage = Utilities.processImage(reconstructedImage, (width, height), 0, 0)



        lowPSNR = Utilities.psnr(highResImage, lowResImage, 1.0)
        reconstructedPSNR = Utilities.psnr(highResImage, reconstructedImage, 1.0)

        lowResImage = cv2.cvtColor(lowResImage, cv2.COLOR_BGR2RGB)
        highResImage = cv2.cvtColor(highResImage, cv2.COLOR_BGR2RGB)
        reconstructedImage = cv2.cvtColor(reconstructedImage, cv2.COLOR_BGR2RGB)

        fig, axes = plt.subplots(1, 3, figsize=(15, 5), sharey=True, gridspec_kw={'hspace': 0.8})

        if lowPSNR<reconstructedPSNR:
                count+=1
                fig.set_facecolor("lightgreen")
        else:
                fig.set_facecolor("pink")
        
        print("Batch ", str(sample+1), " Of ", str(samples), " Samples")

        axes[0].imshow(lowResImage)
        axes[0].set_title('Low-Res Image', fontsize=15)
        axes[0].set_xlabel('PSNR : ' + str(round(lowPSNR, 2)), fontsize=15)

        axes[1].imshow(reconstructedImage)
        axes[1].set_title('Reconstructed Image', fontsize=15)
        axes[1].set_xlabel('PSNR : ' + str(round(reconstructedPSNR, 2)), fontsize=15)

        axes[2].imshow(highResImage)
        axes[2].set_title('High-Res Image', fontsize=15)
        axes[2].set_xlabel('PSNR : INF', fontsize=15)
    
        plt.show()

    accuracy = round((count/samples)*100, 2)
    print("The Model Improved Image Resolution Of Random Images From ", count, " Samples Out Of The ", samples, " Samples.\n")
    print("Model's Accuracy : ", str(accuracy) + "%")

    return count

In [1]:
def prepareInferenceData(inputDir, outputDir, size, start, end):
    os.makedirs(outputDir, exist_ok=True)
    images = os.listdir(inputDir)[start:end]
    for image in images:
        path = inputDir + "\\" + image
        img = Utilities.processImage(path, (size, size), 0, 0)
        savePath = outputDir + "\\" + image
        cv2.imwrite(savePath, img)