# Learning to See in the Dark

In [1]:
! pip install rawpy
! pip install imageio

Collecting rawpy
  Downloading rawpy-0.17.3-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m0m
Installing collected packages: rawpy
Successfully installed rawpy-0.17.3
[0m

In [2]:
import tensorflow as tf
import numpy as np
import PIL.Image as Image
import cv2
from tensorflow.keras import Input, Model
from tensorflow.keras.layers import Conv2D, Conv2DTranspose, MaxPooling2D, Concatenate, Lambda
import tensorboard
import os
import random
import rawpy
import imageio
import pandas as pd

In [3]:
train_dir = '../input/sid-sony/Sony/long/'
test_dir = '../input/sid-sony/Sony/short/'
BATCH_SIZE = 16
EPOCH = 60

In [4]:
def function_to_depth_to_space(x):
    x = tf.compat.v1.depth_to_space(x, 2)
    return x

def function_to_remove_negs(x):
    return tf.minimum(tf.maximum(x, 0), 1)

In [5]:
class Network:

    def __init__(self):
        self.SIZE = (512, 512)
        self.CHANNELS = 4
        self.crops_per_image = 8
        self.k1=1
        self.k2=-1
        self.model=None
        self.lr=1e-4
        self.batch_size=8
        self.losses=[]
    
    def my_model(self):
        inp = Input(shape=[self.SIZE[0] // 2, self.SIZE[1] // 2, 4])
        out1 = Conv2D(32, [3, 3], activation='relu', padding='same')(inp)
        out2 = Conv2D(32, [3, 3], activation='relu', padding='same')(out1)
        out3 = MaxPooling2D([2, 2], [2, 2], padding='same')(out2)

        out4 = Conv2D(64, [3, 3], activation='relu', padding='same')(out3)
        out5 = Conv2D(64, [3, 3], activation='relu', padding='same')(out4)
        out6 = MaxPooling2D([2, 2], [2, 2], padding='same')(out5)

        out7 = Conv2D(128, [3, 3], activation='relu', padding='same')(out6)
        out8 = Conv2D(128, [3, 3], activation='relu', padding='same')(out7)
        out9 = MaxPooling2D([2, 2], [2, 2], padding='same')(out8)

        out10 = Conv2D(256, [3, 3], activation='relu', padding='same')(out9)
        out11 = Conv2D(256, [3, 3], activation='relu', padding='same')(out10)
        out12 = MaxPooling2D([2, 2], [2, 2], padding='same')(out11)

        out13 = Conv2D(512, [3, 3], activation='relu', padding='same')(out12)
        out14 = Conv2D(512, [3, 3], activation='relu', padding='same')(out13)

        out15 = Conv2DTranspose(256, [2, 2], [2, 2], padding='same')(out14)
        out16 = Concatenate()([out11, out15])

        out17 = Conv2D(256, [3, 3], activation='relu', padding='same')(out16)
        out18 = Conv2D(256, [3, 3], activation='relu', padding='same')(out17)

        out19 = Conv2DTranspose(128, [2, 2], [2, 2], padding='same')(out18)
        out20 = Concatenate()([out8, out19])

        out21 = Conv2D(128, [3, 3], activation='relu', padding='same')(out20)
        out22 = Conv2D(128, [3, 3], activation='relu', padding='same')(out21)

        out23 = Conv2DTranspose(64, [2, 2], [2, 2], padding='same')(out22)
        out24 = Concatenate()([out5, out23])

        out25 = Conv2D(64, [3, 3], activation='relu', padding='same')(out24)
        out26 = Conv2D(64, [3, 3], activation='relu', padding='same')(out25)

        out27 = Conv2DTranspose(32, [2, 2], [2, 2], padding='same')(out26)
        out28 = Concatenate()([out2, out27])

        out29 = Conv2D(32, [3, 3], activation='relu', padding='same')(out28)
        out30 = Conv2D(32, [3, 3], activation='relu', padding='same')(out29)

        out31 = Conv2D(12, (1, 1), activation='relu', padding='same')(out30)
        out32 = Lambda(function_to_depth_to_space)(out31)
        out33 = Lambda(function_to_remove_negs)(out32)

        self.model = Model(inputs=inp, outputs=out33, name='LTSITD_Model')
        return self.model

    def loss(self, y_true, y_pred):

        im1 = tf.image.convert_image_dtype(y_true, tf.float32)
        im2 = tf.image.convert_image_dtype(y_pred, tf.float32)

        # psnr loss
        loss1 = tf.image.psnr(im1, im2, max_val=1.0)

        # ssim loss
        loss2 = tf.image.ssim(im1, im2, max_val=1.0)

        # canny edge detection

    #     edges1=tf.image.sobel_edges(y_true)
    #     edges2=tf.image.sobel_edges(y_pred)
    #     loss3 = tf.reduce_mean(tf.keras.losses.mean_squared_error(edges1, edges2))

        loss = tf.math.subtract(tf.math.scalar_mul(self.k2, loss2), tf.math.scalar_mul(self.k1, loss1))
        self.losses.append(loss)
        return loss

#   def randomcrop(self, img1, img2, size=[512, 512]):
#     assert img1.shape[0] >= size[0]
#     assert img1.shape[1] >= size[1]
#     assert img2.shape[0] >= size[0]
#     assert img2.shape[1] >= size[1]
#     x = random.randint(0, img1.shape[1] - size[0])
#     y = random.randint(0, img1.shape[0] - size[1])

#     img1 = img1[y:y + size[0], x:x + size[1]]
#     img2 = img2[y:y + size[0], x:x + size[1]]
#     return img1, img2

    def pack_raw(self, raw):
        # pack Bayer image to 4 channels
        im = raw
        im = np.maximum(im - 512, 0) / (16383 - 512)  # subtract the black level

        im = np.expand_dims(im, axis=2)
        img_shape = im.shape
        H = img_shape[0]
        W = img_shape[1]

        out = np.concatenate((im[0:H:2, 0:W:2, :],
                              im[0:H:2, 1:W:2, :],
                              im[1:H:2, 1:W:2, :],
                              im[1:H:2, 0:W:2, :]), axis=2)
        return out
    
    def PSNR(self, y_true, y_pred):
        return tf.convert_to_tensor(20 * tf.experimental.numpy.log10(tf.reduce_max(tf.reduce_max(y_pred)) / tf.math.sqrt(tf.math.reduce_mean(tf.keras.metrics.mean_squared_error(y_true, y_pred)))))

    def SSIM(self, y_true, y_pred):
        return tf.image.ssim(y_true, y_pred, 1)
    
    def summary(self):
        print(self.model.summary())

    def model_compile(self):
        optimizer = tf.keras.optimizers.Adam(learning_rate=self.lr, beta_1=0.9,
                                              beta_2=0.999, epsilon=None,
                                              decay=0.0, amsgrad=False)

        self.model.compile(loss=self.loss, optimizer=optimizer, metrics=['mse', 'mae', self.PSNR, self.SSIM])

#     def train(self, in_image, out_image):
#         self.model.train_on_batch(in_image, out_image)

    def save_model(self, count):
        self.model.save('LTSITD_model_' + str(count) + '.h5')

In [6]:
mod = Network()
mod.model = tf.keras.models.load_model('/kaggle/input/model140/LTSITD_model_140.h5', custom_objects={'loss': mod.loss, 'PSNR': mod.PSNR, 'SSIM': mod.SSIM})
mod.model_compile()
mod.summary()

2022-11-30 01:13:22.931075: I tensorflow/core/common_runtime/process_util.cc:146] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.


Model: "LTSITD_Model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_2 (InputLayer)            [(None, 256, 256, 4) 0                                            
__________________________________________________________________________________________________
conv2d_19 (Conv2D)              (None, 256, 256, 32) 1184        input_2[0][0]                    
__________________________________________________________________________________________________
conv2d_20 (Conv2D)              (None, 256, 256, 32) 9248        conv2d_19[0][0]                  
__________________________________________________________________________________________________
max_pooling2d_4 (MaxPooling2D)  (None, 128, 128, 32) 0           conv2d_20[0][0]                  
_______________________________________________________________________________________

In [None]:
# tf.keras.utils.plot_model(mod.model, to_file='model.jpeg', show_shapes=False)

In [10]:
with open('../input/sid-sony/Sony_train_list.txt') as f:
    lines = f.readlines()

map_imgs = {'short': [], 'long': []}
for line in lines:
    count = 0
    for i in range(len(line)):
        if line[i] == 'W' and count == 0:
            first = i
            count = 1
        elif line[i] == 'W' and count == 1:
            second = i
            count = 2
    map_imgs['short'].append('../input/sid-sony' + line[1:first + 1])
    map_imgs['long'].append('../input/sid-sony' + line[first + 3:second + 1])
map_imgs_train = pd.DataFrame(map_imgs)
map_imgs_train.shape

(1865, 2)

In [9]:
train = map_imgs_train
X = []
Y = []
z = 0
# for ep in range(EPOCH):
#     print('Epoch {} / {}:'.format(ep + 1, EPOCH))
for row in range(1000):

    inp = train.iat[row, 0]
    out = train.iat[row, 1]

    if inp[-8:-5] == '0.04':
        amp_ratio = 250
    else: 
        amp_ratio = 100
    try:
        in_img = rawpy.imread(inp).raw_image_visible.astype(np.float32)

        out_img = rawpy.imread(out).postprocess(use_camera_wb=True, half_size=False, no_auto_bright=True, output_bps=16)

    except:
        continue

    in_img1, out_img1 = cv2.resize(in_img, (512, 512)), cv2.resize(out_img, (512, 512))
    in_image = np.expand_dims(mod.pack_raw(in_img1), axis=0) * amp_ratio
    out_image = np.expand_dims(np.float32(out_img1 / 65535.0), axis=0)
    X.append(in_image)
    Y.append(out_image)
    if row % 100 == 0:
        print("finished", row , "rows")
print(len(X), len(Y))        
#         print(in_image)
#         print(out_image * 255)
#         break
#     break
#         print(in_image.shape, out_image.shape)
        
#         mod.train(in_image, out_image)
#         del in_img, out_img, in_img1, out_img1, in_image, out_image
#     z += 1 
#     mod.save_model(z)
#     print("Done with", str(z), "epochs")
# print("Full Training Done")

finished 0 rows
finished 100 rows
finished 200 rows
finished 300 rows
finished 400 rows
finished 500 rows
finished 600 rows
finished 700 rows
finished 800 rows
finished 900 rows
1000 1000


In [10]:
print(len(X), len(Y)) 
X = np.array(X)
Y = np.array(Y)
X.shape, Y.shape

1000 1000


((1000, 1, 256, 256, 4), (1000, 1, 512, 512, 3))

In [11]:
X = X.reshape((1000, 256, 256, 4))
Y = Y.reshape((1000, 512, 512, 3))

In [12]:
X.shape, Y.shape

((1000, 256, 256, 4), (1000, 512, 512, 3))

In [13]:
mod.model_compile()
history = mod.model.fit(X, Y, validation_split=0.2, epochs=EPOCH, batch_size=BATCH_SIZE)
mod.save_model(200)

2022-11-29 15:58:59.504026: W tensorflow/core/framework/cpu_allocator_impl.cc:80] Allocation of 2516582400 exceeds 10% of free system memory.
2022-11-29 15:59:00.925897: W tensorflow/core/framework/cpu_allocator_impl.cc:80] Allocation of 2516582400 exceeds 10% of free system memory.
2022-11-29 15:59:01.516487: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)


Epoch 1/60


2022-11-29 15:59:05.060658: I tensorflow/stream_executor/cuda/cuda_dnn.cc:369] Loaded cuDNN version 8005


Epoch 2/60
Epoch 3/60
Epoch 4/60
Epoch 5/60
Epoch 6/60
Epoch 7/60
Epoch 8/60
Epoch 9/60
Epoch 10/60
Epoch 11/60
Epoch 12/60
Epoch 13/60
Epoch 14/60
Epoch 15/60
Epoch 16/60
Epoch 17/60
Epoch 18/60
Epoch 19/60
Epoch 20/60
Epoch 21/60
Epoch 22/60
Epoch 23/60
Epoch 24/60
Epoch 25/60
Epoch 26/60
Epoch 27/60
Epoch 28/60
Epoch 29/60
Epoch 30/60
Epoch 31/60
Epoch 32/60
Epoch 33/60
Epoch 34/60
Epoch 35/60
Epoch 36/60
Epoch 37/60
Epoch 38/60
Epoch 39/60
Epoch 40/60
Epoch 41/60
Epoch 42/60
Epoch 43/60
Epoch 44/60
Epoch 45/60
Epoch 46/60
Epoch 47/60
Epoch 48/60
Epoch 49/60
Epoch 50/60
Epoch 51/60
Epoch 52/60
Epoch 53/60
Epoch 54/60
Epoch 55/60
Epoch 56/60
Epoch 57/60
Epoch 58/60
Epoch 59/60
Epoch 60/60


In [None]:
with open('../input/sid-sony/Sony_test_list.txt') as f:
    lines = f.readlines()

map_imgs = {'short': [], 'long': []}
for line in lines:
    count = 0
    for i in range(len(line)):
        if line[i] == 'W' and count == 0:
            first = i
            count = 1
        elif line[i] == 'W' and count == 1:
            second = i
            count = 2
    map_imgs['short'].append('../input/sid-sony' + line[1:first + 1])
    map_imgs['long'].append('../input/sid-sony' + line[first + 3:second + 1])
map_imgs_test = pd.DataFrame(map_imgs)
map_imgs_test.shape

In [20]:
# Testing the Model
test = map_imgs_train
inp = test.iat[9, 0]
out = test.iat[9, 1]
print(inp)
print(out)
# inp = '../input/btp-ltsitd-data/dark_image_1_1669010833670.raw'
# out = '../input/btp-ltsitd-data/original_image_1_1669010833670.raw'

if '0.04' in inp:
    amp_ratio = 250
else: 
    amp_ratio = 100

in_img = rawpy.imread(inp).raw_image_visible.astype(np.float32)

out_img = rawpy.imread(out).postprocess(use_camera_wb=True, half_size=False, no_auto_bright=True, output_bps=16)

in_img1, out_img1 = cv2.resize(in_img, (512, 512)), cv2.resize(out_img, (512, 512))
in_image = np.expand_dims(mod.pack_raw(in_img1), axis=0) * amp_ratio
out_image = np.expand_dims(np.float32(out_img1 / 65535.0), axis=0)
# in_image = X[890:891]
# out_image = Y[890:891]

# in_image.shape, out_image.shape
predicted_image = mod.model.predict(in_image)
predicted_image = predicted_image[0]
predicted_image = predicted_image * 255
predicted_image = predicted_image.astype('uint8')
# cv2.imwrite('./predicted_image_52.jpeg', predicted_image)
predicted_image_pil = Image.fromarray(predicted_image)
predicted_image_pil.save('./predicted_image_9_140.jpeg')

../input/sid-sony/Sony/short/00001_07_0.1s.ARW
../input/sid-sony/Sony/long/00001_00_10s.ARW


In [21]:
mod.model.evaluate(in_image, out_image)



[-27.754474639892578,
 0.002032056450843811,
 0.028739411383867264,
 26.92064094543457,
 0.8338328003883362]

In [22]:
orig_image = out_image[0]
orig_image = orig_image * 255
orig_image = orig_image.astype('uint8')
# cv2.imwrite('./actual_image_51.jpeg', out_image)
orig_image_pil = Image.fromarray(orig_image)
orig_image_pil.save('./orig_image_9_140.jpeg')