<a href="https://colab.research.google.com/github/Nick088Official/Real-ESRGAN_Pytorch/blob/main/Real_ESRGAN_Pytorch_Inference_NO_UI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Real-ESRGAN Pytorch Inference NO UI

[![arXiv](https://img.shields.io/badge/arXiv-Paper-<COLOR>.svg)](https://arxiv.org/abs/2107.10833)
[![GitHub Stars](https://img.shields.io/github/stars/Nick088Official/Real-ESRGAN_Pytorch?style=social)](https://github.com/Nick088Official/Real-ESRGAN_Pytorch)

This is a **Practical Image Restoration Demo** of the Pytorch ai-forever custom pretrained Real-ESRGAN models.
We extend the powerful ESRGAN to a practical restoration application (namely, Real-ESRGAN), which is trained with pure synthetic data. <br>
The following figure shows some real-life examples.

Low quality image:

![](https://github.com/Nick088Official/Real-ESRGAN-Pytorch/blob/main/inputs/lr_lion.png?raw=1)

Real-ESRGAN_Pytorch result:

![](https://github.com/Nick088Official/Real-ESRGAN-Pytorch/blob/main/results/sr_lion.png?raw=1)

**Note that Real-ESRGAN may still fail in some cases as the real-world degradations are really too complex.**<br>
Moreover, it **may not** perform well on **human faces, you could try [GFPGAN](https://colab.research.google.com/github/Nick088Official/GFPGAN-Fix/blob/master/GFPGAN_fix_inference.ipynb) for that
<br>

This is the Pytorch Implementation of https://github.com/ai-forever/Real-ESRGAN

**Credits:** [Nick088](https://linktr.ee/Nick088), Xinntao, Tencent, Geeve George, ai-forever

In [None]:
#@title Installation
#@markdown Before running, make sure that you choose
#@markdown * Runtime Type = Python 3
#@markdown * Hardware Accelerator = GPU (Faster, free daily limit of gpu around 12 hours) or CPU (very slow)

#@markdown in the **Runtime** menu -> **Change runtime type**

#@markdown By running this, we clone the repository, set up the envrironment.


from IPython.display import clear_output
import torch

if torch.cuda.is_available():
    device = "cuda"
    print("Using GPU")
else:
    device = "cpu"
    print("Using CPU")

!pip install git+https://github.com/sberbank-ai/Real-ESRGAN.git
%cd Real-ESRGAN
!pip install ffmpeg-python
from RealESRGAN import RealESRGAN
from PIL import Image
import numpy as np
import ffmpeg
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model2 = RealESRGAN(device, scale=2)
model2.load_weights('weights/RealESRGAN_x2.pth', download=True)
model4 = RealESRGAN(device, scale=4)
model4.load_weights('weights/RealESRGAN_x4.pth', download=True)
model8 = RealESRGAN(device, scale=8)
model8.load_weights('weights/RealESRGAN_x8.pth', download=True)
clear_output()
print(f'Installed with all its models on {"GPU" if torch.cuda.is_available() else "CPU"}!')

# Inference Images

In [None]:
#@title Upload & Inference Images
import os
from google.colab import files
import shutil
from io import BytesIO
import io


model_scale = "4" #@param ["2", "4", "8"] {allow-input: false}

model = RealESRGAN(device, scale=int(model_scale))
model.load_weights(f'weights/RealESRGAN_x{model_scale}.pth', download=False)


upload_folder = 'inputs'
result_folder = 'results'

os.makedirs(upload_folder, exist_ok=True)
os.makedirs(result_folder, exist_ok=True)

IMAGE_FORMATS = ('.png', '.jpg', '.jpeg', '.tiff', '.bmp', '.gif')


def process_input(filename):
  result_image_path = os.path.join('results/', os.path.basename(filename))
  image = Image.open(filename).convert('RGB')
  sr_image = model.predict(np.array(image))
  sr_image.save(result_image_path)
  print(f'Finished! Image saved to {result_image_path}')

# upload files
uploaded = files.upload()
for filename in uploaded.keys():
    dst_path = os.path.join(upload_folder, filename)
    shutil.move(filename, dst_path)
    print(f'moved input {filename} to {dst_path}')
    print('Processing:', filename)
    process_input(f"inputs/{filename}")

In [None]:
#@title Visualize Input vs Output Image
# utils for visualization
import cv2
import matplotlib.pyplot as plt
def display(img1, img2):
  fig = plt.figure(figsize=(25, 10))
  ax1 = fig.add_subplot(1, 2, 1)
  plt.title('Input image', fontsize=16)
  ax1.axis('off')
  ax2 = fig.add_subplot(1, 2, 2)
  plt.title(f'Real-ESRGAN Pytorch x{model_scale} output', fontsize=16)
  ax2.axis('off')
  ax1.imshow(img1)
  ax2.imshow(img2)
def imread(img_path):
  img = cv2.imread(img_path)
  img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
  return img

# display each image in the upload folder
import os
import glob

input_folder = 'inputs'
result_folder = 'results'
input_list = sorted(glob.glob(os.path.join(input_folder, '*')))
output_list = sorted(glob.glob(os.path.join(result_folder, '*')))
for input_path, output_path in zip(input_list, output_list):
  img_input = imread(input_path)
  img_output = imread(output_path)
  display(img_input, img_output)

In [None]:
#@title Download the results
zip_filename = f'Real-ESRGAN-Pytorch-x{model_scale}_result.zip'
if os.path.exists(zip_filename):
  os.remove(zip_filename)
os.system(f"zip -r -j {zip_filename} results/*")
files.download(zip_filename)

In [None]:
#@title Delete Inputs & Outputs Image
#@markdown You need to run this to clear the previous inputs and ouputs after you finished inferencing the images you wanted and downloaded the results, only after this you can inference again.

shutil.rmtree(upload_folder)
shutil.rmtree(result_folder)
print("Deleted previous Inputs & Outputs Images, now you can inference again.")

# Inference Videos

In [None]:
#@title Upload Input Video
import os
from google.colab import files
import shutil
import cv2
from google.colab.patches import cv2_imshow

upload_folder = 'upload'
result_folder = 'results'
video_folder = 'videos'
video_result_folder = 'results_videos'
video_mp4_result_folder = 'results_mp4_videos'
result_restored_imgs_folder = 'restored_imgs'

if os.path.isdir(upload_folder):
  print(upload_folder+" exists")
else :
  os.mkdir(upload_folder)

if os.path.isdir(video_folder):
  print(video_folder+" exists")
else :
  os.mkdir(video_folder)

if os.path.isdir(video_result_folder):
  print(video_result_folder+" exists")
else :
  os.mkdir(video_result_folder)

if os.path.isdir(video_mp4_result_folder):
  print(video_mp4_result_folder+" exists")
else :
  os.mkdir(video_mp4_result_folder)

if os.path.isdir(result_folder):
  print(result_folder+" exists")
else :
  os.mkdir(result_folder)

%cd results
if os.path.isdir(result_restored_imgs_folder):
  print(result_restored_imgs_folder+" exists")
else :
  os.mkdir(result_restored_imgs_folder)
%cd ..

if os.path.isdir(video_folder):
    shutil.rmtree(video_folder)
os.mkdir(video_folder)

%cd videos
# upload images
uploaded = files.upload()
for filename in uploaded.keys():
  dst_path = os.path.join(video_folder, filename)
  cap = cv2.VideoCapture(filename)
  fps = cap.get(cv2.CAP_PROP_FPS)
  name_video_file_input = f'/content/videos/{filename}'
  print(f'moved {filename} of {fps} fps to {dst_path}')
%cd ..

In [None]:
#@title Inference Videos
import cv2
import numpy as np
import glob
from os.path import isfile, join
import subprocess
from IPython.display import clear_output
import os
from google.colab import files
import shutil
from io import BytesIO
import io
import ffmpeg

IMAGE_FORMATS = ('.png', '.jpg', '.jpeg', '.tiff', '.bmp', '.gif')

model_scale = "2" #@param ["2", "4", "8"] {allow-input: false}

model = RealESRGAN(device, scale=int(model_scale))
model.load_weights(f'weights/RealESRGAN_x{model_scale}.pth', download=False)

def process_input(filename):
  result_image_path = os.path.join('results/restored_imgs', os.path.basename(filename))
  image = Image.open(filename).convert('RGB')
  sr_image = model.predict(np.array(image))
  sr_image.save(result_image_path)
  print(f'Finished Frame of the Video, saved to {result_image_path}!')

# assign directory
directory = 'videos' #PATH_WITH_INPUT_VIDEOS
zee = 0

def convert_frames_to_video(pathIn,pathOut,fps):
    frame_array = []
    files = [f for f in os.listdir(pathIn) if isfile(join(pathIn, f))]
    #for sorting the file names properly
    files.sort(key = lambda x: int(x[5:-4]))
    size2 = (0,0)

    for i in range(len(files)):
        filename=pathIn + files[i]
        #reading each files
        img = cv2.imread(filename)
        height, width, layers = img.shape
        size = (width,height)
        size2 = size
        print(filename)
        #inserting the frames into an image array
        frame_array.append(img)
    out = cv2.VideoWriter(pathOut,cv2.VideoWriter_fourcc(*'DIVX'), fps, size2)
    for i in range(len(frame_array)):
        # writing to a image array
        out.write(frame_array[i])
    out.release()

for filename in os.listdir(directory):
    f = os.path.join(directory, filename)
    # checking if it is a file
    if os.path.isfile(f):
        print("PROCESSING :"+str(f)+"\n")
        # Read the video from specified path

        # Check if the input video has an audio stream
        probe = ffmpeg.probe(f)
        has_audio = any(stream['codec_type'] == 'audio' for stream in probe['streams'])

        if has_audio:
            # Extract audio from the input video
            audio_file = f.replace(".mp4", ".wav")
            ffmpeg.input(f).output(audio_file, format='wav', ac=1).run(overwrite_output=True)

        # video to frames
        cam = cv2.VideoCapture(str(f))

        try:
            # PATH TO STORE VIDEO FRAMES
            if not os.path.exists('upload'):
                os.makedirs('upload')

        # if not created then raise error
        except OSError:
            print ('Error: Creating directory of data')

        # frame
        currentframe = 0

        while(True):
            # reading from frame
            ret,frame = cam.read()

            if ret:
                # if video is still left continue creating images
                name = 'upload/frame' + str(currentframe) + '.jpg'

                # writing the extracted images
                cv2.imwrite(name, frame)

                # increasing counter so that it will
                # show how many frames are created
                currentframe += 1
                print(currentframe)
            else:
                #deletes all the videos you uploaded for upscaling
                #for f in os.listdir(video_folder):
                #  os.remove(os.path.join(video_folder, f))

                break

        # Release all space and windows once done
        cam.release()
        cv2.destroyAllWindows()

        #apply super-resolution on all frames of a video

        # Specify the directory path
        all_frames_path = "upload"

        # Get a list of all files in the directory
        file_names = os.listdir(all_frames_path)

        # process the files
        for file_name in file_names:
            process_input(f"upload/{file_name}")

        #convert super res frames to .avi
        pathIn = 'results/restored_imgs/'

        zee = zee+1
        fName = "video"+str(zee)
        filenameVid = f"{fName}.avi"

        pathOut = "results_videos/"+filenameVid

        convert_frames_to_video(pathIn, pathOut, fps)

        # Re-encode the video with the modified audio
        ffmpeg.input(pathOut).output(pathOut.replace(".avi", ".mp4"), vcodec='libx264', acodec='aac', audio_bitrate='320k').run(overwrite_output=True)

        if has_audio:
            # Replace the original audio with the upscaled audio
            ffmpeg.input(audio_file).output(pathOut.replace(".avi", ".mp4"), acodec='aac', audio_bitrate='320k').run(overwrite_output=True)

        #convert .avi to .mp4
        src = 'results_videos/'
        dst = 'results_mp4_videos/'

        for root, dirs, filenames in os.walk(src, topdown=False):
            #print(filenames)
            for filename in filenames:
                print('[INFO] 1',filename)
                try:
                    _format = ''
                    if ".flv" in filename.lower():
                        _format=".flv"
                    if ".mp4" in filename.lower():
                        _format=".mp4"
                    if ".avi" in filename.lower():
                        _format=".avi"
                    if ".mov" in filename.lower():
                        _format=".mov"

                    inputfile = os.path.join(root, filename)
                    print('[INFO] 1',inputfile)
                    outputfile = os.path.join(dst, filename.lower().replace(_format, ".mp4"))
                    subprocess.call(['ffmpeg', '-i', inputfile, outputfile])
                    name_video_file_output = f'/content/{outputfile}'
                    clear_output()
                    print("Successfully processed the video!")
                except:
                    print("An exception occurred")

In [None]:
#@title Display Input vs Output Video

from IPython.display import HTML
from base64 import b64encode

# Read the input video file
mp4_input = open(f'{name_video_file_input}', 'rb').read()

# Read the output video file (modify this path accordingly)
mp4_output = open(f'{name_video_file_output}', 'rb').read()

# Create data URLs for both videos
data_url_input = "data:video/mp4;base64," + b64encode(mp4_input).decode()
data_url_output = "data:video/mp4;base64," + b64encode(mp4_output).decode()

# Display the videos side by side
HTML(f"""
<div style="display: flex; align-items: center; justify-content: center;">
    <div style="text-align: center;">
        <h2>Input</h2>
        <video width=400 controls>
            <source src='{data_url_input}' type='video/mp4'>
        </video>
    </div>
    <div style="font-size: 24px; margin: 0 20px;">vs</div>
    <div style="text-align: center;">
        <h2>Output</h2>
        <video width=400 controls>
            <source src='{data_url_output}' type='video/mp4'>
        </video>
    </div>
</div>
""")

In [None]:
#@title Download Video & Frames Results
#@markdown It will download you a .zip, unzip it however you want

#@markdown It will have 2 folders:

#@markdown 1. 'results' , which has inside 'restored_imgs' = the results of the frames of the video

#@markdown 2. 'results_mp4_videos' = the results of the video after the face has been restored (so basically all the frames together)

# download the result
!ls results
print('Download results')
os.system(f'zip -r download.zip results/restored_imgs results_mp4_videos')
files.download("download.zip")

In [None]:
#@title Delete Inputs & Outputs Videos
#@markdown You need to run this to clear the previous inputs and ouputs after you finished inferencing the videos you wanted and downloaded the results, only after this you can inference again.

shutil.rmtree(upload_folder)
shutil.rmtree(result_folder)
shutil.rmtree(video_folder)
shutil.rmtree(video_result_folder)
shutil.rmtree(video_mp4_result_folder)

print("Deleted previous Inputs & Outputs Videos, now you can inference again.")