In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
#Importing libraries
import cv2
import matplotlib.pyplot as plt
for i in range(1,5):
    image = cv2.imread(f"../input/darkface/image/{i}.png")

    if image is None:
        print("Error: Unable to read the image.")
    else:
        # Display the image
        image=cv2.cvtColor(image,cv2.COLOR_BGR2RGB)
        plt.imshow(image)
        plt.axis("off")
        plt.show()

In [None]:
# File path
file_path = "/kaggle/input/darkface/label/4710.txt"

# Open the file
with open(file_path, 'r') as file:
    # Read the contents
    contents = file.read()

# Print the contents
print(contents)

#this part contains the number of people in the frame and the bounding box coordinates


In [None]:
for dirname, _, filenames in os.walk('/kaggle/input/enhanced-input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

In [None]:
#Importing libraries
import cv2
import matplotlib.pyplot as plt
for i in range(1,5):
    image = cv2.imread(f"../input/darkface/image/{i}.png")

    if image is None:
        print("Error: Unable to read the image.")
    else:
        # Display the image
        image=cv2.cvtColor(image,cv2.COLOR_BGR2RGB)
        plt.imshow(image)
        plt.axis("off")
        plt.show()

In [None]:
# File path
file_path = "/kaggle/input/darkface/label/4710.txt"

# Open the file
with open(file_path, 'r') as file:
    # Read the contents
    contents = file.read()

# Print the contents
print(contents)

#this part contains the number of people in the frame and the bounding box coordinates



In [None]:
for dirname, _, filenames in os.walk('/kaggle/input/enhanced-input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))
        
print("done")

In [None]:
import cv2
import matplotlib.pyplot as plt

for i in range(1, 5):
    filepath = f"/kaggle/input/enhanced-input/{i}.png"
    print("Trying to read:", filepath)
    image = cv2.imread(filepath)
    if image is None:
        print("Failed to read:", filepath)
        continue
    
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    
    plt.imshow(image)
    plt.axis("off")
    plt.show()
print("done")

In [None]:
from glob import glob
import transformers
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

from PIL import Image
import numpy as np
from huggingface_hub import from_pretrained_keras

In [None]:
train_low_light_images=sorted(glob("/kaggle/input/darkface/image/*.png"))[:400]
val_low_light_images=sorted(glob("/kaggle/input/darkface/image/*.png"))[400:500]
test_low_light=sorted(glob("/kaggle/input/darkface/image/*.png"))[600:700]
IMAGE_SIZE=256
BATCH_SIZE=10

def load_data(file_path):
    try:
        image = tf.io.read_file(file_path)
        image = tf.image.decode_png(image, channels=3)
        image = tf.image.resize(image, [IMAGE_SIZE, IMAGE_SIZE])
        image = image / 255.0
        return image
    except Exception as e:
        print(f"Error loading image {file_path}: {e}")
        return None



def data_generator(low_light_images):
    dataset = tf.data.Dataset.from_tensor_slices((low_light_images))
    dataset = dataset.map(load_data, num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.filter(lambda x: x is not None)  # Filter out None values
    dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)
    return dataset

train_dataset=data_generator(train_low_light_images)
val_dataset=data_generator(val_low_light_images)

print("TRAIN DATASET",train_dataset)
print("VAL DATASET",val_dataset)

In [None]:
def build_dce_net():
    input_img=keras.Input(shape=[None,None,3])
    conv1=layers.Conv2D(32,(3,3),activation="relu",padding="same")(input_img)
    conv2=layers.Conv2D(32,(3,3),activation="relu",padding="same")(conv1)
    conv3=layers.Conv2D(32,(3,3),activation="relu",padding="same")(conv2)
    conv4=layers.Conv2D(32,(3,3),activation="relu",padding="same")(conv3)
    
    int_con1=layers.Concatenate(axis=-1)([conv4,conv3])
    
    conv5=layers.Conv2D(32,(3,3),activation="relu",padding="same")(int_con1)
    
    int_con2=layers.Concatenate(axis=-1)([conv5,conv2])
    
    conv6=layers.Conv2D(32,(3,3),activation="relu",padding="same")(int_con2)
    
    int_con3=layers.Concatenate(axis=-1)([conv6,conv1])
    
    x_r=layers.Conv2D(24,(3,3),activation="tanh",padding="same")(int_con3)
    
    return keras.Model(inputs=input_img,outputs=x_r)
print("done")


In [None]:
#now here we define the loss functions for the dce net
#the color constancy loss is used to correct the potential color devaitions  in the enhanced image

def color_constancy_loss(x):
    mean_rgb=tf.reduce_mean(x,axis=(1,2),keepdims=True)
    mr,mg,mb=mean_rgb[:,:,:,0],mean_rgb[:,:,:,1],mean_rgb[:,:,:,2]
    d_rg=tf.square(mr-mg)
    d_rb=tf.square(mr-mb)
    d_gb=tf.square(mb-mg)
    
    return tf.sqrt(tf.square(d_rg)+tf.square(d_rb)+tf.square(d_gb))
print("done")

In [None]:
#exposure loss ,it measures  the distance between the average intensity value of a local
#region and a preset well exposedness levels(set to 0.6)

def exposure_loss(x,mean_val=0.6):
    x=tf.reduce_mean(x,axis=3,keepdims=True)
    mean=tf.nn.avg_pool2d(x,ksize=16,strides=16,padding="VALID")
    return tf.reduce_mean(tf.square(mean-mean_val))

print("done")

In [None]:
#illumination smoothness loss
#to preserve the monotonicity  relations betweent he neighbouring  pixels , the illumination  smoothness loss  is  added to each curve parmaeter map
def illumination_smoothness_loss(x):
    batch_size=tf.shape(x)[0]
    h_x=tf.shape(x)[1]
    w_x=tf.shape(x)[2]
    count_h = (tf.shape(x) [2] - 1) * tf.shape(x) [3]
    count_w = tf.shape (x) [2] * (tf.shape (x) [3] - 1)
    h_tv=tf.reduce_sum(tf.square((x[:,1:,:,:]-x[:,:h_x-1,:,:])))
    w_tv=tf.reduce_sum(tf.square((x[:,:,1:,:]-x[:,:,:w_x-1,:])))
    batch_size=tf.cast(batch_size,dtype=tf.float32)
    count_h=tf.cast(count_h,dtype=tf.float32)
    count_w=tf.cast(count_w,dtype=tf.float32)
    return 2*(h_tv/count_h + w_tv/count_w)/batch_size

print("done")
    

In [None]:
import tensorflow as tf
from tensorflow import keras

class SpatialConsistencyLoss(keras.losses.Loss):
    def __init__(self, **kwargs):
        super(SpatialConsistencyLoss, self).__init__(reduction='none')
        self.left_kernel = tf.constant([[[[0,0,0]],[[-1,1,0]],[[0,0,0]]]], dtype=tf.float32)
        self.right_kernel = tf.constant([[[[0,0,0]],[[0,1,-1]],[[0,0,0]]]], dtype=tf.float32)
        self.up_kernel = tf.constant([[[[0,-1,0]],[[0,1,0]],[[0,0,0]]]], dtype=tf.float32)
        self.down_kernel = tf.constant([[[[0,0,0]],[[0,1,0]],[[0,-1,0]]]], dtype=tf.float32)
        
    def call(self, y_true, y_pred):
        original_mean = tf.reduce_mean(y_true, 3, keepdims=True)
        enhanced_mean = tf.reduce_mean(y_pred, 3, keepdims=True)
        
        original_pool = tf.nn.avg_pool2d(original_mean, ksize=4, strides=4, padding="VALID")
        enhanced_pool = tf.nn.avg_pool2d(enhanced_mean, ksize=4, strides=4, padding='VALID')
        
        d_original_left = tf.nn.conv2d(original_pool, self.left_kernel, strides=[1,1,1,1], padding="SAME")
        d_original_right = tf.nn.conv2d(original_pool, self.right_kernel, strides=[1,1,1,1], padding="SAME")
        d_original_up = tf.nn.conv2d(original_pool, self.up_kernel, strides=[1,1,1,1], padding="SAME")
        d_original_down = tf.nn.conv2d(original_pool, self.down_kernel, strides=[1,1,1,1], padding="SAME")
        
        d_enhanced_left = tf.nn.conv2d(enhanced_pool, self.left_kernel, strides=[1,1,1,1], padding="SAME")
        d_enhanced_right = tf.nn.conv2d(enhanced_pool, self.right_kernel, strides=[1,1,1,1], padding="SAME")
        d_enhanced_up = tf.nn.conv2d(enhanced_pool, self.up_kernel, strides=[1,1,1,1], padding="SAME")
        d_enhanced_down = tf.nn.conv2d(enhanced_pool, self.down_kernel, strides=[1,1,1,1], padding="SAME")
                                     
        d_left = tf.square(d_original_left - d_enhanced_left)
        d_right = tf.square(d_original_right - d_enhanced_right)
        d_up = tf.square(d_original_up - d_enhanced_up)
        d_down = tf.square(d_original_down - d_enhanced_down)
                                     
        return d_left + d_right + d_up + d_down
print("done")

In [None]:
class ZeroDCE(keras.Model):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.dce_model = build_dce_net()

    def compile(self, learning_rate, **kwargs):
        super().compile(**kwargs)
        self.optimizer = keras.optimizers.Adam(learning_rate=learning_rate)
        self.spatial_constancy_loss = SpatialConsistencyLoss(reduction="none")
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.illumination_smoothness_loss_tracker = keras.metrics.Mean(
            name="illumination_smoothness_loss"
        )
        self.spatial_constancy_loss_tracker = keras.metrics.Mean(
            name="spatial_constancy_loss"
        )
        self.color_constancy_loss_tracker = keras.metrics.Mean(
            name="color_constancy_loss"
        )
        self.exposure_loss_tracker = keras.metrics.Mean(name="exposure_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.illumination_smoothness_loss_tracker,
            self.spatial_constancy_loss_tracker,
            self.color_constancy_loss_tracker,
            self.exposure_loss_tracker,
        ]

    def get_enhanced_image(self, data, output):
        r1 = output[:, :, :, :3]
        r2 = output[:, :, :, 3:6]
        r3 = output[:, :, :, 6:9]
        r4 = output[:, :, :, 9:12]
        r5 = output[:, :, :, 12:15]
        r6 = output[:, :, :, 15:18]
        r7 = output[:, :, :, 18:21]
        r8 = output[:, :, :, 21:24]
        x = data + r1 * (tf.square(data) - data)
        x = x + r2 * (tf.square(x) - x)
        x = x + r3 * (tf.square(x) - x)
        enhanced_image = x + r4 * (tf.square(x) - x)
        x = enhanced_image + r5 * (tf.square(enhanced_image) - enhanced_image)
        x = x + r6 * (tf.square(x) - x)
        x = x + r7 * (tf.square(x) - x)
        enhanced_image = x + r8 * (tf.square(x) - x)
        return enhanced_image

    def call(self, data):
        dce_net_output = self.dce_model(data)
        return self.get_enhanced_image(data, dce_net_output)

    def compute_losses(self, data, output):
        enhanced_image = self.get_enhanced_image(data, output)
        loss_illumination = 200 * illumination_smoothness_loss(output)
        loss_spatial_constancy = tf.reduce_mean(
            self.spatial_constancy_loss(enhanced_image, data)
        )
        loss_color_constancy = 5 * tf.reduce_mean(color_constancy_loss(enhanced_image))
        loss_exposure = 10 * tf.reduce_mean(exposure_loss(enhanced_image))
        total_loss = (
            loss_illumination
            + loss_spatial_constancy
            + loss_color_constancy
            + loss_exposure
        )

        return {
            "total_loss": total_loss,
            "illumination_smoothness_loss": loss_illumination,
            "spatial_constancy_loss": loss_spatial_constancy,
            "color_constancy_loss": loss_color_constancy,
            "exposure_loss": loss_exposure,
        }

    def train_step(self, data):
        with tf.GradientTape() as tape:
            output = self.dce_model(data)
            losses = self.compute_losses(data, output)

        gradients = tape.gradient(
            losses["total_loss"], self.dce_model.trainable_weights
        )
        self.optimizer.apply_gradients(zip(gradients, self.dce_model.trainable_weights))

        self.total_loss_tracker.update_state(losses["total_loss"])
        self.illumination_smoothness_loss_tracker.update_state(
            losses["illumination_smoothness_loss"]
        )
        self.spatial_constancy_loss_tracker.update_state(
            losses["spatial_constancy_loss"]
        )
        self.color_constancy_loss_tracker.update_state(losses["color_constancy_loss"])
        self.exposure_loss_tracker.update_state(losses["exposure_loss"])

        return {metric.name: metric.result() for metric in self.metrics}

    def test_step(self, data):
        output = self.dce_model(data)
        losses = self.compute_losses(data, output)

        self.total_loss_tracker.update_state(losses["total_loss"])
        self.illumination_smoothness_loss_tracker.update_state(
            losses["illumination_smoothness_loss"]
        )
        self.spatial_constancy_loss_tracker.update_state(
            losses["spatial_constancy_loss"]
        )
        self.color_constancy_loss_tracker.update_state(losses["color_constancy_loss"])
        self.exposure_loss_tracker.update_state(losses["exposure_loss"])

        return {metric.name: metric.result() for metric in self.metrics}

    def save_weights(self, filepath, overwrite=True, save_format=None, options=None):
        """While saving the weights, we simply save the weights of the DCE-Net"""
        self.dce_model.save_weights(
            filepath,
            overwrite=overwrite,
            save_format=save_format,
            options=options,
        )

    def load_weights(self, filepath, by_name=False, skip_mismatch=False, options=None):
        """While loading the weights, we simply load the weights of the DCE-Net"""
        self.dce_model.load_weights(
            filepath=filepath,
            by_name=by_name,
            skip_mismatch=skip_mismatch,
            options=options,
        )

        
print('done')

In [None]:
# now  we need to train our model
zero_dce_model=ZeroDCE()
zero_dce_model.compile(learning_rate=1e-4)
history=zero_dce_model.fit(train_dataset,validation_data=val_dataset,epochs=4)

print("done")

In [None]:
print(history.history)

In [None]:
def plot_result(item):
    # Check if the item exists in history
    if item in history.history:
        plt.plot(history.history[item], label=item)
        # Check if the corresponding validation item exists in history
        if "val_" + item in history.history:
            plt.plot(history.history["val_" + item], label="val_" + item)
        plt.xlabel("Epochs")
        plt.ylabel(item)
        plt.title("Train and Validation {} Over Epochs".format(item), fontsize=14)
        plt.legend()
        plt.grid()
        plt.show()
    else:
        print(f"'{item}' not found in history.")

    
plot_result("total_loss")
plot_result("illumination_smootheness_loss")
plot_result("spatial_consistency_loss")
plot_result("color_constancy_loss")
plot_result("exposure_loss")

In [None]:
def infer(original_image):
    image = keras.utils.img_to_array(original_image)
    image = image.astype("float32") / 255.0
    image = np.expand_dims(image, axis=0)
    output_image = zero_dce_model(image)
    output_image = tf.cast((output_image[0, :, :, :] * 255), dtype=np.uint8)
    output_image = Image.fromarray(output_image.numpy())
    return output_image
print("done")

In [None]:
'''
# create inference function
def inference(o_image):
# preprocess for the model
    image = keras.preprocessing.image.ing_to_array(o_image)
    image = image.astype("float32") / 255.0
    image = np. expand_dims (image, axis=0) # create batch of 1 image
    print (image. shape) # should be (1, H, w, C)
    output_image = model (image) # run the image through model
    output_image = post_process (image, output_image) # will implement this later
    output_image = tf.cast((output_image[0, :, :, :] * 255), dtype=np.uint8) # processing for PIL
    output_image = Image. fromarray (output_image.numpy())
    return ouput_image
'''

In [None]:
from PIL import Image 
import numpy as np

def plot_results(images, titles, figure_size=(12, 12)):
    fig = plt.figure(figsize=figure_size)
    for i in range(len(images)):
        fig.add_subplot(1, len(images), i + 1).set_title(titles[i])
        _ = plt.imshow(images[i])
        plt.axis("off")
    plt.show()

test_low_light_images = sorted(glob("/kaggle/input/darkface/image/*.png"))[100:150]
for val_image_file in test_low_light_images:
    original_image = Image.open(val_image_file)
    enhanced_image = infer(original_image)
    plot_results(
        [original_image, enhanced_image],
        ["Original", "Enhanced"],
        (20, 12)
    )

    # Convert PIL Images to numpy arrays
    original_array = np.array(original_image)
    enhanced_array = np.array(enhanced_image)

    # Perform subtraction
    subtracted_image_array = original_array - enhanced_array

    # Convert the result back to PIL Image
    subtracted_image = Image.fromarray(subtracted_image_array)

    # Display or save the subtracted image as needed
    print(subtracted_image_array)


HERE WE TRY TO DO THE IMAGE LIGHTENING USING MIRNET

In [None]:
def selective_kernel_feature_fusion(
    multi_scale_feature_1, multi_scale_feature_2, multi_scale_feature_3
):
    channels = list(multi_scale_feature_1.shape)[-1]
    combined_feature = layers.Add()(
        [multi_scale_feature_1, multi_scale_feature_2, multi_scale_feature_3]
    )
    gap = layers.GlobalAveragePooling2D()(combined_feature)
    channel_wise_statistics = layers.Reshape((1, 1, channels))(gap)
    compact_feature_representation = layers.Conv2D(
        filters=channels // 8, kernel_size=(1, 1), activation="relu"
    )(channel_wise_statistics)
    feature_descriptor_1 = layers.Conv2D(
        channels, kernel_size=(1, 1), activation="softmax"
    )(compact_feature_representation)
    feature_descriptor_2 = layers.Conv2D(
        channels, kernel_size=(1, 1), activation="softmax"
    )(compact_feature_representation)
    feature_descriptor_3 = layers.Conv2D(
        channels, kernel_size=(1, 1), activation="softmax"
    )(compact_feature_representation)
    feature_1 = multi_scale_feature_1 * feature_descriptor_1
    feature_2 = multi_scale_feature_2 * feature_descriptor_2
    feature_3 = multi_scale_feature_3 * feature_descriptor_3
    aggregated_feature = layers.Add()([feature_1, feature_2, feature_3])
    return aggregated_feature
print("done")

In [None]:
class ChannelPooling(layers.Layer):
    def __init__(self, axis=-1, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.axis = axis
        self.concat = layers.Concatenate(axis=self.axis)

    def call(self, inputs):
        average_pooling = tf.expand_dims(tf.reduce_mean(inputs, axis=-1), axis=-1)
        max_pooling = tf.expand_dims(tf.reduce_max(inputs, axis=-1), axis=-1)
        return self.concat([average_pooling, max_pooling])

    def get_config(self):
        config = super().get_config()
        config.update({"axis": self.axis})


def spatial_attention_block(input_tensor):
    compressed_feature_map = ChannelPooling(axis=-1)(input_tensor)
    feature_map = layers.Conv2D(1, kernel_size=(1, 1))(compressed_feature_map)
    feature_map = keras.activations.sigmoid(feature_map)
    return input_tensor * feature_map


def channel_attention_block(input_tensor):
    channels = list(input_tensor.shape)[-1]
    average_pooling = layers.GlobalAveragePooling2D()(input_tensor)
    feature_descriptor = layers.Reshape((1, 1, channels))(average_pooling)
    feature_activations = layers.Conv2D(
        filters=channels // 8, kernel_size=(1, 1), activation="relu"
    )(feature_descriptor)
    feature_activations = layers.Conv2D(
        filters=channels, kernel_size=(1, 1), activation="sigmoid"
    )(feature_activations)
    return input_tensor * feature_activations


def dual_attention_unit_block(input_tensor):
    channels = list(input_tensor.shape)[-1]
    feature_map = layers.Conv2D(
        channels, kernel_size=(3, 3), padding="same", activation="relu"
    )(input_tensor)
    feature_map = layers.Conv2D(channels, kernel_size=(3, 3), padding="same")(
        feature_map
    )
    channel_attention = channel_attention_block(feature_map)
    spatial_attention = spatial_attention_block(feature_map)
    concatenation = layers.Concatenate(axis=-1)([channel_attention, spatial_attention])
    concatenation = layers.Conv2D(channels, kernel_size=(1, 1))(concatenation)
    return layers.Add()([input_tensor, concatenation])
print("done")

In [None]:
# Recursive Residual Modules


def down_sampling_module(input_tensor):
    channels = list(input_tensor.shape)[-1]
    main_branch = layers.Conv2D(channels, kernel_size=(1, 1), activation="relu")(
        input_tensor
    )
    main_branch = layers.Conv2D(
        channels, kernel_size=(3, 3), padding="same", activation="relu"
    )(main_branch)
    main_branch = layers.MaxPooling2D()(main_branch)
    main_branch = layers.Conv2D(channels * 2, kernel_size=(1, 1))(main_branch)
    skip_branch = layers.MaxPooling2D()(input_tensor)
    skip_branch = layers.Conv2D(channels * 2, kernel_size=(1, 1))(skip_branch)
    return layers.Add()([skip_branch, main_branch])


def up_sampling_module(input_tensor):
    channels = list(input_tensor.shape)[-1]
    main_branch = layers.Conv2D(channels, kernel_size=(1, 1), activation="relu")(
        input_tensor
    )
    main_branch = layers.Conv2D(
        channels, kernel_size=(3, 3), padding="same", activation="relu"
    )(main_branch)
    main_branch = layers.UpSampling2D()(main_branch)
    main_branch = layers.Conv2D(channels // 2, kernel_size=(1, 1))(main_branch)
    skip_branch = layers.UpSampling2D()(input_tensor)
    skip_branch = layers.Conv2D(channels // 2, kernel_size=(1, 1))(skip_branch)
    return layers.Add()([skip_branch, main_branch])


# MRB Block
def multi_scale_residual_block(input_tensor, channels):
    # features
    level1 = input_tensor
    level2 = down_sampling_module(input_tensor)
    level3 = down_sampling_module(level2)
    # DAU
    level1_dau = dual_attention_unit_block(level1)
    level2_dau = dual_attention_unit_block(level2)
    level3_dau = dual_attention_unit_block(level3)
    # SKFF
    level1_skff = selective_kernel_feature_fusion(
        level1_dau,
        up_sampling_module(level2_dau),
        up_sampling_module(up_sampling_module(level3_dau)),
    )
    level2_skff = selective_kernel_feature_fusion(
        down_sampling_module(level1_dau),
        level2_dau,
        up_sampling_module(level3_dau),
    )
    level3_skff = selective_kernel_feature_fusion(
        down_sampling_module(down_sampling_module(level1_dau)),
        down_sampling_module(level2_dau),
        level3_dau,
    )
    # DAU 2
    level1_dau_2 = dual_attention_unit_block(level1_skff)
    level2_dau_2 = up_sampling_module((dual_attention_unit_block(level2_skff)))
    level3_dau_2 = up_sampling_module(
        up_sampling_module(dual_attention_unit_block(level3_skff))
    )
    # SKFF 2
    skff_ = selective_kernel_feature_fusion(level1_dau_2, level2_dau_2, level3_dau_2)
    conv = layers.Conv2D(channels, kernel_size=(3, 3), padding="same")(skff_)
    return layers.Add()([input_tensor, conv])
print("done")

In [None]:
def recursive_residual_group(input_tensor, num_mrb, channels):
    conv1 = layers.Conv2D(channels, kernel_size=(3, 3), padding="same")(input_tensor)
    for _ in range(num_mrb):
        conv1 = multi_scale_residual_block(conv1, channels)
    conv2 = layers.Conv2D(channels, kernel_size=(3, 3), padding="same")(conv1)
    return layers.Add()([conv2, input_tensor])


def mirnet_model(num_rrg, num_mrb, channels):
    input_tensor = keras.Input(shape=[None, None, 3])
    x1 = layers.Conv2D(channels, kernel_size=(3, 3), padding="same")(input_tensor)
    for _ in range(num_rrg):
        x1 = recursive_residual_group(x1, num_mrb, channels)
    conv = layers.Conv2D(3, kernel_size=(3, 3), padding="same")(x1)
    output_tensor = layers.Add()([input_tensor, conv])
    return keras.Model(input_tensor, output_tensor)


model = mirnet_model(num_rrg=3, num_mrb=2, channels=64)


In [None]:
import tensorflow as tf
import cv2

def charbonnier_loss(y_true, y_pred):
    loss = tf.sqrt(tf.square(y_true - y_pred) + tf.square(1e-3))
    print("charbonnier_loss:", loss)
    return loss

x = cv2.imread("/kaggle/input/darkface/image/1.png")
y = cv2.imread("/kaggle/input/enhanced-input/1.png")

# Convert NumPy arrays to TensorFlow tensors with dtype uint8
x_tensor = tf.convert_to_tensor(x, dtype=tf.uint8)
y_tensor = tf.convert_to_tensor(y, dtype=tf.uint8)

l = charbonnier_loss(x_tensor, y_tensor)
print(l)

'''
def peak_signal_noise_ratio(y_true, y_pred):
    psnr = tf.image.psnr(y_pred, y_true, max_val=255.0)
    print("peak_signal_noise_ratio:", psnr)
    if(psnr==None):
        print(1)
    return psnr


optimizer = keras.optimizers.Adam(learning_rate=1e-4)
model.compile(
    optimizer=optimizer,
    loss=keras.losses.SparseCategoricalCrossentropy(),
    metrics=[peak_signal_noise_ratio],
)

history = model.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=1,
    callbacks=[
        keras.callbacks.ReduceLROnPlateau(
            monitor="val_peak_signal_noise_ratio",
            factor=0.5,
            patience=5,
            verbose=1,
            min_delta=1e-7,
            mode="max",
        )
    ],
)


def plot_history(value, name):
    plt.plot(history.history[value], label=f"train_{name.lower()}")
    plt.plot(history.history[f"val_{value}"], label=f"val_{name.lower()}")
    plt.xlabel("Epochs")
    plt.ylabel(name)
    plt.title(f"Train and Validation {name} Over Epochs", fontsize=14)
    plt.legend()
    plt.grid()
    plt.show()


plot_history("loss", "Loss")
plot_history("peak_signal_noise_ratio", "PSNR")
'''

IMAGE LIGHTENING USING THE RETINEXNET

In [None]:
import tensorflow as tf


In [None]:
def RelightNet(input_L, input_R):
    input_im = tf.concat([input_R, input_L])
    with tf.variable_scope('RelightNet', reuse=tf.AUTO_REUSE):
        conv0 = tf.layers.conv2d(input_im, 64, 3, padding='same', activation=None)
        conv1 = tf.layers.conv2d(conv0, 64, 3, strides=2, padding='same', activation=tf.nn.relu)
        conv2 = tf.layers.conv2d(conv1, 64,3, strides=2, padding='same', activation=tf.nn.relu)
        conv3 = tf.layers.conv2d(conv2, 64, 3, strides=2, padding='same', activation=tf.nn.relu)
        
        up1 = tf.image.resize_nearest_neighbor(conv3, (tf.shape(conv2)[1], tf.shape(conv2)[2]))
        deconv1 = tf.layers.conv2d(up1, 64, 3, padding='same', activation=tf.nn.relu) + conv2
        up2 = tf.image.resize_nearest_neighbor(deconv1, (tf.shape(conv1)[1], tf.shape(conv1)[2]))
        deconv2= tf.layers.conv2d(up2, 64, 3, padding='same', activation=tf.nn.relu) + conv1
        up3 = tf.image.resize_nearest_neighbor(deconv2, (tf.shape(conv0)[1], tf.shape(conv0)[2]))
        deconv3 = tf.layers.conv2d(up3, 64,3, padding='same', activation=tf.nn.relu) + conv0
        
        deconv1_resize = tf.image.resize_nearest_neighbor(deconv1, (tf.shape(deconv3)[1], tf.shape(deconv3)[2]))
        deconv2_resize = tf.image.resize_nearest_neighbor(deconv2, (tf.shape(deconv3)[1], tf.shape(deconv3)[2]))
        feature_gather = concat([deconv1_resize, deconv2_resize, deconv3])
        feature_fusion = tf.layers.conv2d(feature_gather, 64, 1, padding='same', activation=None)
        output = tf.layers.conv2d(feature_fusion, 1, 3, padding='same', activation=None)
    return output

In [None]:
import tensorflow as tf
import numpy as np
import time
from tqdm import tqdm
import random

class lowlight_enhance_2:

    def __init__(self, train_high_data,  Reflectance, Illuminance, Reflectance_test, Illuminance_test, batch_size, patch_size, epoch, learning_rate, train_phase):

        self.train_high_data = train_high_data

        self.R_low = Reflectance
        self.I_low = Illuminance

        self.R_low_test = Reflectance_test
        self.I_low_test = Illuminance_test

        self.batch_size = batch_size
        self.patch_size = patch_size
        self.epoch = epoch
        self.learning_rate = learning_rate
        self.train_phase = train_phase

        self.train_high_data_ph = tf.keras.Input(shape=(None, None, 3), name='train_high_data_ph')
        self.R_low_ph = tf.keras.Input(shape=(None, None, 3), name='R_low_ph')
        self.I_low_ph = tf.keras.Input(shape=(None, None, 3), name='I_low_ph')

        self.lr_ph = tf.keras.Input(shape=(), name='lr_ph')

        I_delta = RelightNet(self.I_low_ph, self.R_low_ph)

        I_delta_3 = tf.concat([I_delta, I_delta, I_delta], axis=-1)

        self.output_I_delta = I_delta_3
        self.output_S = self.R_low_ph * I_delta_3

        self.relight_loss = tf.reduce_mean(tf.abs(self.R_low_ph * I_delta_3 - self.train_high_data_ph))

        self.Ismooth_loss_delta = self.smooth(I_delta, self.R_low_ph)
 
        self.loss_Relight = self.relight_loss + 3 * self.Ismooth_loss_delta

        self.lr_ph = tf.keras.Input(shape=(), name='learning_rate')
        optimizer = tf.keras.optimizers.Adam(self.lr_ph)
 
        self.var_Relight = [var for var in tf.trainable_variables() if 'RelightNet' in var.name]

        self.train_op_Relight = optimizer.minimize(self.loss_Relight, var_list = self.var_Relight)

        self.saver_Relight = tf.train.Saver(var_list = self.var_Relight)

        print("[*] Initialize model successfully...")

    def gradient(self, input_tensor, direction):
        self.smooth_kernel_x = tf.reshape(tf.constant([[0, 0], [-1, 1]], tf.float32), [2, 2, 1, 1])
        self.smooth_kernel_y = tf.transpose(self.smooth_kernel_x, [1, 0, 2, 3])

        if direction == "x":
            kernel = self.smooth_kernel_x
        elif direction == "y":
            kernel = self.smooth_kernel_y
        return tf.abs(tf.nn.conv2d(input_tensor, kernel, strides=[1, 1, 1, 1], padding='SAME'))

    def ave_gradient(self, input_tensor, direction):
        return tf.nn.avg_pool2d(self.gradient(input_tensor, direction), ksize=3, strides=1, padding='SAME')

    def smooth(self, input_I, input_R):
        input_R = tf.image.rgb_to_grayscale(input_R)
        return tf.reduce_mean(self.gradient(input_I, "x") * tf.exp(-10 * self.ave_gradient(input_R, "x")) + self.gradient(input_I, "y") * tf.exp(-10 * self.ave_gradient(input_R, "y")))
    
    def evaluate_train(self):

        print("Evaluating for train data")
        R_low_hat = []
        I_low_hat = []

        for idx in tqdm(range(len(self.R_low))):
        #for idx in tqdm(range(start_id, end_id)):
            input_R_low = np.expand_dims(self.R_low[idx], axis=0)
            input_I_low = np.expand_dims(self.I_low[idx], axis=0)

            if self.train_phase == "Relight":
                result_1, result_2 = self.sess.run([self.output_S, self.output_I_delta], feed_dict={self.R_low_ph: input_R_low, self.I_low_ph: input_I_low,})

            R_low_hat.append(result_1)
            I_low_hat.append(result_2)

        return R_low_hat, I_low_hat
    
    def train(self):
 
        numBatch = 30
    
        train_op = self.train_op_Relight
        train_loss = self.loss_Relight
        saver = self.saver_Relight

        iter_num = 0
        start_epoch = 0
        start_step = 0
        lr1 = self.learning_rate

        start_time = time.time()
        image_id = 0
        
        for epoch in range(start_epoch, self.epoch):

            for batch_id in range(start_step, numBatch):

        # generate data for a batch
                batch_R_low = np.zeros((self.batch_size, 128, 128, 3), dtype="float32")
                batch_I_low = np.zeros((self.batch_size, 128, 128, 3), dtype="float32")
                batch_input_high = np.zeros((self.batch_size, 128, 128, 3), dtype="float32")

                for patch_id in range(self.batch_size):
                    batch_R_low[patch_id, :, :, :] = self.R_low[image_id, :, :, :]
                    batch_I_low[patch_id, :, :, :] = self.I_low[image_id, :, :, :]
                    batch_input_high[patch_id, :, :, :] = self.train_high_data[image_id][:,:,:]

                    image_id = (image_id + 1) % len(self.R_low)
                    if image_id == 0:
                        tmp = list(zip(self.R_low, self.I_low))
                        random.shuffle(list(tmp))
                        R_low, I_low  = zip(*tmp)
                # train
                _, loss = self.sess.run([train_op, train_loss], feed_dict={self.R_low_ph: batch_R_low, \
                                                                    self.I_low_ph: batch_I_low, \
                                                                    self.lr_ph: lr1[epoch], \
                                                                    self.train_high_data_ph: batch_input_high})
        
                print("%s Epoch: [%2d] [%4d/%4d] time: %4.4f, loss: %.6f" \
                      % (self.train_phase, epoch + 1, batch_id + 1, numBatch, time.time() - start_time, loss))
                iter_num += 1 

    

In [None]:
model2 = lowlight_enhance_2(train_dataset, 0.12, 0.5, 0.9, 0.67, 10,5 , 10, 0.01, "RelightNet")
Output_S, Output_I_delta = model2.evaluate_train()