In [2]:
!git clone https://github.com/jantic/DeOldify.git DeOldify

fatal: destination path 'DeOldify' already exists and is not an empty directory.


In [3]:
cd DeOldify

/content/DeOldify


In [4]:
#NOTE:  This must be the first call in order to work properly!
from deoldify import device
from deoldify.device_id import DeviceId
#choices:  CPU, GPU0...GPU7
device.set(device=DeviceId.GPU0)

import torch

if not torch.cuda.is_available():
    print('GPU not available.')

GPU not available.


In [5]:
!pip install -r requirements-colab.txt



In [16]:
from deoldify.visualize import get_image_colorizer

In [6]:
import os
import cv2
import numpy as np
from skimage import restoration, color, filters
import torch
from torch import nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import tensorflow as tf
from flask import Flask, request, render_template, send_file
import fastai

2024-08-11 09:07:59.100559: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-08-11 09:07:59.127693: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-08-11 09:07:59.135461: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-08-11 09:07:59.154887: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
INFO:numexpr.utils:NumExpr defaulting to 2 threads.


NumExpr defaulting to 2 threads.


In [7]:
# Define paths and global variables
UPLOAD_FOLDER = '/content/Upload_folder'
RESULT_FOLDER = '/content/Result_folder'
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg'}

In [8]:
# Initialize Flask app
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['RESULT_FOLDER'] = RESULT_FOLDER

In [9]:
# Utility function: Check allowed file extensions
def allowed_file(filename):
    return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS

In [13]:
image_path = '/content/Image/Image.png'

In [14]:
# Utility function: Preprocess images
def preprocess_image(image_path):
    # Load the image
    image = cv2.imread(image_path)

    # Convert to grayscale if necessary
    gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

    # Normalize the image
    norm_image = cv2.normalize(gray_image, None, 0, 255, cv2.NORM_MINMAX)

    # Noise reduction
    denoised_image = restoration.denoise_bilateral(norm_image, sigma_color=0.05, sigma_spatial=15)

    # Enhance contrast
    contrast_image = cv2.equalizeHist(denoised_image)

    # Resize image if needed
    resized_image = cv2.resize(contrast_image, (256, 256), interpolation=cv2.INTER_AREA)

    return resized_image

In [17]:
# Sophisticated GAN Architectures
class DeOldifyColorizer:
    def __init__(self):
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.colorizer = get_image_colorizer(artistic=False)

    def colorize_image(self, image_path):
        return self.colorizer.get_transformed_image(path=image_path, render_factor=35, post_process=True)

CycleGAN Implementation

In [18]:
# CycleGAN Implementation
class ResidualBlock(nn.Module):
    def __init__(self, in_features):
        super(ResidualBlock, self).__init__()
        self.block = nn.Sequential(
            nn.Conv2d(in_features, in_features, kernel_size=3, stride=1, padding=1),
            nn.InstanceNorm2d(in_features),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_features, in_features, kernel_size=3, stride=1, padding=1),
            nn.InstanceNorm2d(in_features)
        )

    def forward(self, x):
        return x + self.block(x)

In [20]:
class Generator(nn.Module):
    def __init__(self, input_nc, output_nc, n_residual_blocks=9):
        super(Generator, self).__init__()

        # Initial Convolution Block
        model = [
            nn.Conv2d(input_nc, 64, kernel_size=7, stride=1, padding=3),
            nn.InstanceNorm2d(64),
            nn.ReLU(inplace=True)
        ]

        # Downsampling
        in_features = 64
        out_features = in_features * 2
        for _ in range(2):
            model += [
                nn.Conv2d(in_features, out_features, kernel_size=3, stride=2, padding=1),
                nn.InstanceNorm2d(out_features),
                nn.ReLU(inplace=True)
            ]
            in_features = out_features
            out_features = in_features * 2

        # Residual blocks
        for _ in range(n_residual_blocks):
            model += [ResidualBlock(in_features)]

        # Upsampling
        out_features = in_features // 2
        for _ in range(2):
            model += [
                nn.ConvTranspose2d(in_features, out_features, kernel_size=3, stride=2, padding=1, output_padding=1),
                nn.InstanceNorm2d(out_features),
                nn.ReLU(inplace=True)
            ]
            in_features = out_features
            out_features = in_features // 2

        # Output layer
        model += [nn.Conv2d(64, output_nc, kernel_size=7, stride=1, padding=3)]
        model += [nn.Tanh()]

        self.model = nn.Sequential(*model)

    def forward(self, x):
        return self.model(x)

In [21]:
class Discriminator(nn.Module):
    def __init__(self, input_nc):
        super(Discriminator, self).__init__()

        model = [
            nn.Conv2d(input_nc, 64, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True)
        ]

        model += [
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.InstanceNorm2d(128),
            nn.LeakyReLU(0.2, inplace=True)
        ]

        model += [
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),
            nn.InstanceNorm2d(256),
            nn.LeakyReLU(0.2, inplace=True)
        ]

        model += [
            nn.Conv2d(256, 512, kernel_size=4, stride=1, padding=1),
            nn.InstanceNorm2d(512),
            nn.LeakyReLU(0.2, inplace=True)
        ]

        model += [nn.Conv2d(512, 1, kernel_size=4, stride=1, padding=1)]
        self.model = nn.Sequential(*model)

    def forward(self, x):
        return self.model(x)

In [22]:
class CycleGAN(nn.Module):
    def __init__(self):
        super(CycleGAN, self).__init__()
        self.generator_A2B = Generator(input_nc=3, output_nc=3)
        self.generator_B2A = Generator(input_nc=3, output_nc=3)
        self.discriminator_A = Discriminator(input_nc=3)
        self.discriminator_B = Discriminator(input_nc=3)

    def forward(self, real_A, real_B):
        fake_B = self.generator_A2B(real_A)
        recon_A = self.generator_B2A(fake_B)
        fake_A = self.generator_B2A(real_B)
        recon_B = self.generator_A2B(fake_A)

        return fake_B, recon_A, fake_A, recon_B

In [23]:
class CycleGANEnhancer:
    def __init__(self, model_path=None):
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.model = CycleGAN().to(self.device)
        if model_path:
            self.model.load_state_dict(torch.load(model_path))

    def enhance_image(self, image):
        with torch.no_grad():
            image_tensor = torch.from_numpy(image).permute(2, 0, 1).float().unsqueeze(0).to(self.device)
            fake_B, _, _, _ = self.model(image_tensor, image_tensor)
            return fake_B.squeeze().cpu().numpy().transpose(1, 2, 0)


In [24]:
# Image repair and enhancement
def repair_image(image):
    # Edge detection using Canny
    edges = cv2.Canny(image, 100, 200)

    # Inpainting using skimage
    inpainted_image = restoration.inpaint_biharmonic(image, edges, multichannel=True)

    return inpainted_image

In [25]:
# Model training and deployment (for illustration purposes)
def train_gan(training_data, model_save_path):
    model = CycleGAN().to('cuda' if torch.cuda.is_available() else 'cpu')
    optimizer_G = optim.Adam(
        list(model.generator_A2B.parameters()) + list(model.generator_B2A.parameters()), lr=0.0002, betas=(0.5, 0.999))
    optimizer_D_A = optim.Adam(model.discriminator_A.parameters(), lr=0.0002, betas=(0.5, 0.999))
    optimizer_D_B = optim.Adam(model.discriminator_B.parameters(), lr=0.0002, betas=(0.5, 0.999))

    criterion_GAN = nn.MSELoss()
    criterion_cycle = nn.L1Loss()

    for epoch in range(100):  # Example epoch count
        for real_A, real_B in training_data:
            real_A, real_B = real_A.to('cuda'), real_B.to('cuda')

            # Generator A2B and B2A
            fake_B, recon_A, fake_A, recon_B = model(real_A, real_B)

            loss_GAN_A2B = criterion_GAN(model.discriminator_B(fake_B), torch.ones_like(fake_B))
            loss_GAN_B2A = criterion_GAN(model.discriminator_A(fake_A), torch.ones_like(fake_A))

            loss_cycle_A = criterion_cycle(recon_A, real_A)
            loss_cycle_B = criterion_cycle(recon_B, real_B)

            loss_G = loss_GAN_A2B + loss_GAN_B2A + 10 * (loss_cycle_A + loss_cycle_B)

            optimizer_G.zero_grad()
            loss_G.backward()
            optimizer_G.step()

            # Discriminator A
            loss_D_A = criterion_GAN(model.discriminator_A(real_A), torch.ones_like(real_A)) + \
                       criterion_GAN(model.discriminator_A(fake_A.detach()), torch.zeros_like(fake_A))

            optimizer_D_A.zero_grad()
            loss_D_A.backward()
            optimizer_D_A.step()

            # Discriminator B
            loss_D_B = criterion_GAN(model.discriminator_B(real_B), torch.ones_like(real_B)) + \
                       criterion_GAN(model.discriminator_B(fake_B.detach()), torch.zeros_like(fake_B))

            optimizer_D_B.zero_grad()
            loss_D_B.backward()
            optimizer_D_B.step()

        # Save model periodically
        if (epoch + 1) % 10 == 0:
            torch.save(model.state_dict(), os.path.join(model_save_path, f'cyclegan_epoch_{epoch + 1}.pth'))

    # Post-training optimizations
    torch.quantization.quantize_dynamic(model, {nn.Linear}, dtype=torch.qint8)
    torch.save(model.state_dict(), os.path.join(model_save_path, 'cyclegan_quantized.pth'))

In [26]:
# Flask Routes for User Interaction
@app.route('/')
def upload_image():
    return render_template('upload.html')

@app.route('/restore', methods=['POST'])
def restore_image():
    if 'file' not in request.files:
        return 'No file part'

    file = request.files['file']

    if file.filename == '':
        return 'No selected file'

    if file and allowed_file(file.filename):
        filename = secure_filename(file.filename)
        file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
        file.save(file_path)

        # Preprocess the image
        preprocessed_image = preprocess_image(file_path)

        # Restore and colorize the image
        deoldify = DeOldifyColorizer()
        colorized_image = deoldify.colorize_image(file_path)

        # Enhance the image with CycleGAN
        cyclegan = CycleGANEnhancer()
        enhanced_image = cyclegan.enhance_image(colorized_image)

        # Repair the image
        repaired_image = repair_image(enhanced_image)

        # Save the result
        result_path = os.path.join(app.config['RESULT_FOLDER'], filename)
        cv2.imwrite(result_path, repaired_image)

        return send_file(result_path, mimetype='image/jpeg')

    return 'File not allowed'

if __name__ == '__main__':
    app.run(debug=True)

 * Serving Flask app '__main__'
 * Debug mode: on


 * Running on http://127.0.0.1:5000


 * Running on http://127.0.0.1:5000


INFO:werkzeug:[33mPress CTRL+C to quit[0m


[33mPress CTRL+C to quit[0m


INFO:werkzeug: * Restarting with stat


 * Restarting with stat
