In [1]:
!git clone https://github.com/Emeika/image-deblur.git

Cloning into 'image-deblur'...
remote: Enumerating objects: 1802, done.[K
remote: Counting objects: 100% (1802/1802), done.[K
remote: Compressing objects: 100% (1795/1795), done.[K
remote: Total 1802 (delta 9), reused 1788 (delta 4), pack-reused 0[K
Receiving objects: 100% (1802/1802), 39.95 MiB | 17.40 MiB/s, done.
Resolving deltas: 100% (9/9), done.


In [2]:
cd image-deblur/

/content/image-deblur


***Import Modules***

In [17]:
import os
import numpy as np
import pandas as pd
import ast
from tensorflow.keras.preprocessing.image import img_to_array, load_img
from sklearn.model_selection import train_test_split, KFold
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Dense, Reshape, Conv2D, UpSampling2D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import img_to_array, load_img, array_to_img

***Extract features into a feature vector and flatten them onto a csv***

In [4]:
# the flattened feature vector directly in the CSV file, resulting in a truncated representation but doesnt load properly
def extract_features(img_dir, label, size=(128, 128)):
    data = []
    for filename in os.listdir(img_dir):
        if filename.endswith(".jpg") or filename.endswith(".JPG"):
            img_path = os.path.join(img_dir, filename)
            img = load_img(img_path, target_size=size)
            img_array = img_to_array(img) / 255.0
            img_flat = img_array.flatten()
            data.append([img_path, img_flat, label])

    return data

# Extract features
blur_features = extract_features('/content/image-deblur/resized_dataset/blur', label=0)
sharp_features = extract_features('/content/image-deblur/resized_dataset/sharp', label=1)

# Combine and create DataFrame
columns = ['image_path', 'image_features', 'label']
features_df = pd.DataFrame(blur_features + sharp_features, columns=columns)

# Save to CSV
features_df.to_csv('image_features.csv', index=False)


In [13]:
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from tqdm import tqdm

def load_and_preprocess_images(image_folder, label):
    data = []
    for file in tqdm(sorted(os.listdir(image_folder))):
        if any(file.endswith(extension) for extension in ['.jpg', '.JPG']):
            image_path = os.path.join(image_folder, file)
            image = tf.keras.preprocessing.image.load_img(image_path, target_size=(128, 128))
            image = tf.keras.preprocessing.image.img_to_array(image).astype('float32') / 255
            image_flattened = image.flatten()  # Flatten the image
            image_flattened_str = ','.join(map(str, image_flattened))  # Convert to a single string
            data.append([image_path, image_flattened_str, label])  # Add image path and label
    return data

# Extract features
blur_features = load_and_preprocess_images('/content/image-deblur/resized_dataset/blur', label=0)
sharp_features = load_and_preprocess_images('/content/image-deblur/resized_dataset/sharp', label=1)

# Combine data and create a DataFrame
columns = ['image_path', 'image_features', 'label']
data = sharp_features + blur_features
features_df = pd.DataFrame(data, columns=columns)

# Save to CSV
features_df.to_csv('image_data.csv', index=False)


100%|██████████| 350/350 [00:12<00:00, 27.81it/s]
100%|██████████| 350/350 [00:12<00:00, 28.24it/s]


***Load Data***

In [14]:
def load_images_from_csv(csv_file):
    df = pd.read_csv(csv_file)
    image_paths = df['image_path'].values
    labels = df['label'].values
    image_data = df['image_features'].apply(lambda x: np.fromstring(x, sep=',')).values
    images = np.array([img.reshape(128, 128, 3) for img in image_data])  # Reshape to original dimensions
    return images, image_paths, labels

# Load images from CSV
images_loaded, image_paths_loaded, labels_loaded = load_images_from_csv('image_data.csv')

print(images_loaded.shape)  # Should be (num_images, 128, 128, 3)
print(image_paths_loaded.shape)  # Should be (num_images,)
print(labels_loaded.shape)  # Should be (num_images,)


(700, 128, 128, 3)
(700,)
(700,)


In [None]:
# Step 1: Read the CSV file
# data = pd.read_csv('image_features.csv')


# # Step 2: Parse the numpy array strings into actual numpy arrays
# data['image_features'] = data['image_features'].apply(lambda x: np.fromstring(x[1:-1], sep=' '))

# # data['image_features'] = data['image_features'].apply(lambda x: x.reshape(128, 128, 3))

# # # Check that the features length is 49152
# num_samples = data.shape[0]
# feature_length = data['image_features'].apply(len).unique()
# assert len(feature_length) == 1 and feature_length[0] == 128 * 128 * 3, \
#     f"Expected features of length {128 * 128 * 3}, but got {feature_length}"

# Step 3: Reshape the data for training
# X = np.vstack(data['image_features'].values)
# y = data['label'].values
# print(X.shape)
# image_shape = (128, 128, 3)
# X_reshaped = X.reshape(-1, *image_shape)



***Train Model by Split into Training, Testing and Validation set using kfolds cross validation***

In [None]:
# Define the number of folds
num_folds = 5

# Initialize the KFold object
kf = KFold(n_splits=num_folds, shuffle=True, random_state=42)

# Initialize lists to store training and validation losses
train_losses = []
val_losses = []
image_shape = (128, 128, 3)

# Iterate over the folds
for fold, (train_index, val_index) in enumerate(kf.split(images_loaded)):
    print(f"Training Fold {fold + 1}")

    # Get the data for this fold
    X_train_fold, X_val_fold = images_loaded[train_index], images_loaded[val_index]

    # Reshape features to match image dimensions
    X_train_fold = X_train_fold.reshape(-1, *image_shape)
    X_val_fold = X_val_fold.reshape(-1, *image_shape)

    # Build the model
    model = Sequential([
        Conv2D(64, (3, 3), activation='relu', padding='same', input_shape=image_shape),
        Conv2D(64, (3, 3), activation='relu', padding='same', strides=2),
        Conv2D(128, (3, 3), activation='relu', padding='same'),
        Conv2D(128, (3, 3), activation='relu', padding='same', strides=2),
        UpSampling2D((2, 2)),
        Conv2D(128, (3, 3), activation='relu', padding='same'),
        UpSampling2D((2, 2)),
        Conv2D(64, (3, 3), activation='relu', padding='same'),
        Conv2D(3, (3, 3), activation='sigmoid', padding='same')
    ])

    model.compile(optimizer=Adam(), loss='mean_squared_error')
    # model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    # batch size = 32

    # Train the model
    history = model.fit(X_train_fold, X_train_fold, epochs=50, batch_size=16, validation_data=(X_val_fold, X_val_fold))

    # Save training and validation losses
    train_losses.append(history.history['loss'])
    val_losses.append(history.history['val_loss'])

    # Save the model
    model.save(f'deblur_model_fold_{fold}.h5')

# Print average training and validation losses across folds
print(f"Average Training Loss: {np.mean(train_losses, axis=0)}")
print(f"Average Validation Loss: {np.mean(val_losses, axis=0)}")


Training Fold 1
Epoch 1/50
Epoch 2/50
Epoch 3/50

In [None]:
# Define U-Net model
def unet_model(input_size=(128, 128, 3)):
    inputs = Input(input_size)

    # Encoding path
    conv1 = Conv2D(64, 3, activation='relu', padding='same')(inputs)
    conv1 = Conv2D(64, 3, activation='relu', padding='same')(conv1)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)

    conv2 = Conv2D(128, 3, activation='relu', padding='same')(pool1)
    conv2 = Conv2D(128, 3, activation='relu', padding='same')(conv2)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)

    conv3 = Conv2D(256, 3, activation='relu', padding='same')(pool2)
    conv3 = Conv2D(256, 3, activation='relu', padding='same')(conv3)
    pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)

    conv4 = Conv2D(512, 3, activation='relu', padding='same')(pool3)
    conv4 = Conv2D(512, 3, activation='relu', padding='same')(conv4)
    pool4 = MaxPooling2D(pool_size=(2, 2))(conv4)

    conv5 = Conv2D(1024, 3, activation='relu', padding='same')(pool4)
    conv5 = Conv2D(1024, 3, activation='relu', padding='same')(conv5)

    # Decoding path
    up6 = concatenate([UpSampling2D(size=(2, 2))(conv5), conv4], axis=-1)
    conv6 = Conv2D(512, 3, activation='relu', padding='same')(up6)
    conv6 = Conv2D(512, 3, activation='relu', padding='same')(conv6)

    up7 = concatenate([UpSampling2D(size=(2, 2))(conv6), conv3], axis=-1)
    conv7 = Conv2D(256, 3, activation='relu', padding='same')(up7)
    conv7 = Conv2D(256, 3, activation='relu', padding='same')(conv7)

    up8 = concatenate([UpSampling2D(size=(2, 2))(conv7), conv2], axis=-1)
    conv8 = Conv2D(128, 3, activation='relu', padding='same')(up8)
    conv8 = Conv2D(128, 3, activation='relu', padding='same')(conv8)

    up9 = concatenate([UpSampling2D(size=(2, 2))(conv8), conv1], axis=-1)
    conv9 = Conv2D(64, 3, activation='relu', padding='same')(up9)
    conv9 = Conv2D(64, 3, activation='relu', padding='same')(conv9)

    outputs = Conv2D(3, 1, activation='sigmoid')(conv9)

    model = Model(inputs=[inputs], outputs=[outputs])

    return model

# Compile model
model = unet_model()
model.compile(optimizer='adam', loss='mean_squared_error', metrics=['accuracy'])

# Train the model
model.fit(blur_images, sharp_images, batch_size=16, epochs=50, validation_split=0.1)

# Save the model
model.save('image_deblurring_model.h5')


Epoch 1/50
Epoch 2/50

In [None]:
import numpy as np
from sklearn.model_selection import KFold
from sklearn.model_selection import GridSearchCV
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate
from tensorflow.keras.optimizers import Adam

# Define U-Net model
def unet_model(input_size=(128, 128, 3)):
    inputs = Input(input_size)

    # Encoding path
    conv1 = Conv2D(64, 3, activation='relu', padding='same')(inputs)
    conv1 = Conv2D(64, 3, activation='relu', padding='same')(conv1)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)

    conv2 = Conv2D(128, 3, activation='relu', padding='same')(pool1)
    conv2 = Conv2D(128, 3, activation='relu', padding='same')(conv2)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)

    conv3 = Conv2D(256, 3, activation='relu', padding='same')(pool2)
    conv3 = Conv2D(256, 3, activation='relu', padding='same')(conv3)
    pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)

    conv4 = Conv2D(512, 3, activation='relu', padding='same')(pool3)
    conv4 = Conv2D(512, 3, activation='relu', padding='same')(conv4)
    pool4 = MaxPooling2D(pool_size=(2, 2))(conv4)

    conv5 = Conv2D(1024, 3, activation='relu', padding='same')(pool4)
    conv5 = Conv2D(1024, 3, activation='relu', padding='same')(conv5)

    # Decoding path
    up6 = concatenate([UpSampling2D(size=(2, 2))(conv5), conv4], axis=-1)
    conv6 = Conv2D(512, 3, activation='relu', padding='same')(up6)
    conv6 = Conv2D(512, 3, activation='relu', padding='same')(conv6)

    up7 = concatenate([UpSampling2D(size=(2, 2))(conv6), conv3], axis=-1)
    conv7 = Conv2D(256, 3, activation='relu', padding='same')(up7)
    conv7 = Conv2D(256, 3, activation='relu', padding='same')(conv7)

    up8 = concatenate([UpSampling2D(size=(2, 2))(conv7), conv2], axis=-1)
    conv8 = Conv2D(128, 3, activation='relu', padding='same')(up8)
    conv8 = Conv2D(128, 3, activation='relu', padding='same')(conv8)

    up9 = concatenate([UpSampling2D(size=(2, 2))(conv8), conv1], axis=-1)
    conv9 = Conv2D(64, 3, activation='relu', padding='same')(up9)
    conv9 = Conv2D(64, 3, activation='relu', padding='same')(conv9)

    outputs = Conv2D(3, 1, activation='sigmoid')(conv9)

    model = Model(inputs=[inputs], outputs=[outputs])

    return model

# Data
images = np.concatenate((blur_images, sharp_images), axis=0)  # Concatenate blur and sharp images
labels = np.concatenate((np.zeros(blur_images.shape[0]), np.ones(sharp_images.shape[0])))  # Labels (0 for blur, 1 for sharp)

# Initialize k-fold cross-validation
kf = KFold(n_splits=5, shuffle=True, random_state=42)

# Define parameters for grid search
param_grid = {
    'epochs': [50, 100],
    'batch_size': [16, 32],
    'optimizer': ['adam', 'rmsprop']
}

# Grid search
best_model = GridSearchCV(estimator=unet_model(), param_grid=param_grid, cv=kf, scoring='accuracy', verbose=1, n_jobs=-1)
best_model.fit(images, labels)

# Print best parameters and score
print("Best Parameters: ", best_model.best_params_)
print("Best Score: ", best_model.best_score_)

# Save the best model
best_model.best_estimator_.save('best_deblur_model.h5')


# **Inference**

In [None]:
model = load_model('deblur_model.h5')

def deblur_image(img_path, output_path, size=(128, 128)):
    img = load_img(img_path, target_size=size)
    img_array = img_to_array(img) / 255.0
    img_flat = img_array.flatten()
    img_flat = np.expand_dims(img_flat, axis=0)

    deblurred_flat = model.predict(img_flat)[0]
    deblurred_array = deblurred_flat.reshape(size + (3,))
    deblurred_img = array_to_img(deblurred_array)
    deblurred_img.save(output_path)

# Example usage
deblur_image('path/to/blurry/image.jpg', 'path/to/output/image.jpg')
