# RealESRGAN upscaler

Edited from https://github.com/boomb0om/Real-ESRGAN-colab

Paint/wikiart models from https://archive.org/details/hr-painting-upscaling  
x2/x4 models from https://drive.google.com/drive/folders/16PlVKhTNkSyWFx52RPb2hXPIQveNGbxS

In [None]:
#@title Setup [run once]

!git clone https://github.com/eps696/Real-ESRGAN
%cd Real-ESRGAN
!pip install -r requirements.txt

models = {
  'paint_x4': 'hr-paintings_g.pth',
  # 'wiki_x4': 'wikiart_g.pth',
  'sber_x2': 'RealESRGAN_x2.pth',
  'sber_x4': 'RealESRGAN_x4.pth',
  # 'sber_x8': 'RealESRGAN_x8.pth',
  }

!ln -sf /opt/bin/nvidia-smi /usr/bin/nvidia-smi
!pip install gputil psutil humanize
import os, psutil, humanize
import GPUtil as GPU
from IPython.display import clear_output

import os
import numpy as np
import imageio
from imageio import imsave, imread
from base64 import b64encode
from IPython import display
from IPython.display import HTML, clear_output # , Image, display
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
import ipywidgets as ipy
from google.colab import output, files, drive

# G Drive
drive.mount('/G', force_remount=True)
gdir = '/G/MyDrive'

from RealESRGAN import RealESRGAN
from PIL import Image
import numpy as np
import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# !if [ ! -f "RealESRGAN_x2.pth" ]; then wget https://www.dropbox.com/s/txhxihlzj56sn49/RealESRGAN_x2.pth; fi
# !if [ ! -f "RealESRGAN_x4.pth" ]; then wget https://www.dropbox.com/s/gwzoiqxduyl2syc/RealESRGAN_x4.pth; fi
# !if [ ! -f "RealESRGAN_x8.pth" ]; then wget https://www.dropbox.com/s/cvronh24k4bwr9c/RealESRGAN_x8.pth; fi
!if [ ! -f "hr-paintings_g.pth" ]; then wget https://www.dropbox.com/s/cm7oqo03tbvzdwq/hr-paintings_g.pth; fi
# !if [ ! -f "wikiart_g.pth" ]; then wget https://www.dropbox.com/s/6uoe2cmcmv9ezm5/wikiart_g.pth; fi

# # # functions # # #

def basename(file):
    return os.path.splitext(os.path.basename(file))[0]

def img_list(path, subdir=None):
  if subdir is True:
    files = [os.path.join(dp, f) for dp, dn, fn in os.walk(path) for f in fn]
  else:
    files = [os.path.join(path, f) for f in os.listdir(path)]
  files = [f for f in files if os.path.splitext(f.lower())[1][1:] in ['jpg', 'jpeg', 'png', 'ppm', 'tif']]
  return sorted([f for f in files if os.path.isfile(f)])

def makevid(seq_dir, size=None):
  out_sequence = seq_dir + '/%0' + str(len(basename(img_list(seq_dir)[0]))) + 'd.jpg'
  out_video = seq_dir + '.mp4'
  print('.. generating video ..')
  !ffmpeg -y -v warning -i $out_sequence -crf 20 $out_video
  data_url = "data:video/mp4;base64," + b64encode(open(out_video,'rb').read()).decode()
  wh = '' if size is None else 'width=%d height=%d' % (size, size)
  return """<video %s controls><source src="%s" type="video/mp4"></video>""" % (wh, data_url)

# # # progress bar # # #

import time

class ProgressBar(object):
  def __init__(self, task_num=10):
    self.pbar = ipy.IntProgress(min=0, max=task_num, bar_style='') # (value=0, min=0, max=max, step=1, description=description, bar_style='')
    self.labl = ipy.Label()
    display.display(ipy.HBox([self.pbar, self.labl]))
    self.task_num = task_num
    self.completed = 0
    self.start()

  def start(self, task_num=None):
    if task_num is not None:
      self.task_num = task_num
    if self.task_num > 0:
      self.labl.value = '0/{}'.format(self.task_num)
    else:
      self.labl.value = 'completed: 0, elapsed: 0s'
    self.start_time = time.time()

  def upd(self, *p, **kw):
    self.completed += 1
    elapsed = time.time() - self.start_time + 0.0000000000001
    fps = self.completed / elapsed if elapsed>0 else 0
    if self.task_num > 0:
      finaltime = time.asctime(time.localtime(self.start_time + self.task_num * elapsed / float(self.completed)))
      fin = ' end %s' % finaltime[11:16]
      percentage = self.completed / float(self.task_num)
      eta = int(elapsed * (1 - percentage) / percentage + 0.5)
      self.labl.value = '{}/{}, rate {:.3g}s, time {}s, left {}s, {}'.format(self.completed, self.task_num, 1./fps, shortime(elapsed), shortime(eta), fin)
    else:
      self.labl.value = 'completed {}, time {}s, {:.1f} steps/s'.format(self.completed, int(elapsed + 0.5), fps)
    self.pbar.value += 1
    if self.completed == self.task_num: self.pbar.bar_style = 'success'
    return
    # return self.completed

def time_days(sec):
  return '%dd %d:%02d:%02d' % (sec/86400, (sec/3600)%24, (sec/60)%60, sec%60)
def time_hrs(sec):
  return '%d:%02d:%02d' % (sec/3600, (sec/60)%60, sec%60)
def shortime(sec):
  if sec < 60:
    time_short = '%d' % (sec)
  elif sec < 3600:
    time_short  = '%d:%02d' % ((sec/60)%60, sec%60)
  elif sec < 86400:
    time_short  = time_hrs(sec)
  else:
    time_short = time_days(sec)
  return time_short

gpu = GPU.getGPUs()[0]
process = psutil.Process(os.getpid())
clear_output()
!nvidia-smi -L
print("Gen RAM Free: " + humanize.naturalsize( psutil.virtual_memory().available ), " | Proc size: " + humanize.naturalsize( process.memory_info().rss))
print("GPU RAM Free: {0:.0f}MB | Used: {1:.0f}MB | Util {2:3.0f}% | Total {3:.0f}MB".format(gpu.memoryFree, gpu.memoryUsed, gpu.memoryUtil*100, gpu.memoryTotal))

In [None]:
#@title Process

#@markdown Source can be an image sequence (folder or archive with `zip` extension) or a videofile. <br>
#@markdown Results will be saved as sequence folder + videofile + zip archive. <br>
#@markdown Paths are relative to the Google Drive root.

source  = '' #@param {type:"string"}
out_path = '' #@param {type:"string"}
source     = os.path.join(gdir, source)
out_path    = os.path.join(gdir, out_path)
os.makedirs(out_path, exist_ok=True)

# unfold source
assert os.path.exists(source), "NOT FOUND %s" % source
if os.path.isfile(source):
  source_dir = os.path.join('/content', basename(source))
  os.makedirs(source_dir, exist_ok=True)
  if source.split('.')[-1] == 'zip':
    !unzip -o -q $source -d $source_dir
  else:
    in_ff = os.path.join(source_dir, '%06d.jpg')
    !ffmpeg -y -v warning -i $source $in_ff
  source = source_dir

# get source files
files_in = img_list(source)
count = len(files_in)
print(count, 'images')

# get args
model_type = "paint_x4" #@param ["paint_x4", "sber_x2", "sber_x4"]
# model_type = "paint_x4" #@param ["paint_x4", "wiki_x4", "sber_x2", "sber_x4", "sber_x8"]
pre_downscale = True #@param {type:"boolean"}
scale = int(model_type[-1])

# load model
model = RealESRGAN(device, scale=scale)
model.load_weights(models[model_type])

# process
pbar = ProgressBar(count)
for i, file_in in enumerate(files_in):
  img = Image.open(file_in).convert('RGB')
  if pre_downscale:
    img = img.resize([s//2 for s in img.size], Image.LANCZOS)
  file_out = os.path.join(out_path, os.path.basename(file_in))
  output = model.predict(np.array(img))
  output.save(file_out)
  pbar.upd()

# make output files
try:
  HTML(makevid(out_path))
except: pass
if os.path.exists(out_path + '.mp4'):
  files.download(out_path + '.mp4')
out_zip = out_path + '.zip'
out_path += '/*'
!zip -q $out_zip -j $out_path
if os.path.exists(out_zip):
  files.download(out_zip)

print('DONE')