<a href="https://colab.research.google.com/github/detektor777/colab_list_video/blob/main/sharpness.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

https://drive.google.com/drive

In [None]:
#@title ##**Select Video File** { display-mode: "form" }
import os
import ipywidgets as widgets
from IPython.display import display, clear_output
from google.colab import files
from google.colab import drive

upload_option = "Load from Google Drive Root"  #@param ["Upload from PC", "Load from Google Drive Root", "Load from Google Drive"]

file_name = None
last_selected_button = None

def reset_button_colors(buttons):
    for btn in buttons:
        btn.style.button_color = None

if upload_option == "Upload from PC":
    print("Please upload a video file.")
    uploaded = files.upload()
    if uploaded:
        file_name = list(uploaded.keys())[0]
    else:
        print("No file uploaded.")
        file_name = None

elif upload_option == "Load from Google Drive Root":
    drive.mount('/content/drive')
    root_dir = '/content/drive/MyDrive/'

    video_extensions = ['.mp4', '.mkv', '.avi', '.mov']
    files_list = []

    for f in os.listdir(root_dir):
        if os.path.isfile(os.path.join(root_dir, f)) and os.path.splitext(f)[1].lower() in video_extensions:
            files_list.append(f)

    if not files_list:
        print("No video files found in Google Drive root.")
        file_name = None
    else:
        print("Select a video file from Google Drive root:")

        output = widgets.Output()
        buttons = []

        def on_button_clicked(b):
            global file_name, last_selected_button
            with output:
                clear_output()
                reset_button_colors(buttons)
                selected_file = b.description
                file_name = os.path.join(root_dir, selected_file)

                if file_name and os.path.exists(file_name):
                    b.style.button_color = 'green'
                else:
                    b.style.button_color = 'red'

                last_selected_button = b
                print(f"Selected file: {file_name if file_name else 'None'}")

        for file in files_list:
            button = widgets.Button(description=file, layout=widgets.Layout(width='500px', overflow='hidden', text_overflow='ellipsis'))
            button.on_click(on_button_clicked)
            buttons.append(button)

        display(widgets.VBox(buttons), output)

elif upload_option == "Load from Google Drive":
    drive.mount('/content/drive')
    root_dir = '/content/drive/MyDrive/'

    video_extensions = ['.mp4', '.mkv', '.avi', '.mov']
    files_list = []

    for dirpath, _, filenames in os.walk(root_dir):
        for f in filenames:
            if os.path.splitext(f)[1].lower() in video_extensions:
                relative_path = os.path.relpath(os.path.join(dirpath, f), root_dir)
                files_list.append(relative_path)

    if not files_list:
        print("No video files found in Google Drive or its subfolders.")
        file_name = None
    else:
        print("Select a video file from Google Drive (including subfolders):")

        output = widgets.Output()
        buttons = []

        def on_button_clicked(b):
            global file_name, last_selected_button
            with output:
                clear_output()
                reset_button_colors(buttons)
                selected_file = b.description
                file_name = os.path.join(root_dir, selected_file)

                if file_name and os.path.exists(file_name):
                    b.style.button_color = 'green'
                else:
                    b.style.button_color = 'red'

                last_selected_button = b
                print(f"Selected file: {file_name if file_name else 'None'}")

        for file in files_list:
            button = widgets.Button(description=file, layout=widgets.Layout(width='500px', overflow='hidden', text_overflow='ellipsis'))
            button.on_click(on_button_clicked)
            buttons.append(button)

        display(widgets.VBox(buttons), output)

if file_name:
    print(f"Video file path set to: {file_name}")
else:
    print("Video file path not set. Please select a file.")

In [None]:
#@title ##**Config** { display-mode: "form" }
import os
from google.colab import files
import shutil
from google.colab import drive
output_folder = "google_drive" #@param ["google_drive","root"]

jpeg_quality = 95 #@param {type:"slider", min:0, max:100, step:1}
sharpness_factor = 3.8 #@param {type:"slider", min:0.0, max:7.0, step:0.1}
upscale_option = "height 720px" #@param ["No", "height 720px", "height 1080px", "height 2160px", "2x", "4x"]


upload_folder = 'upload'
result_folder = 'results'

if output_folder == "google_drive":
    if not os.path.exists('/content/drive'):
        print("Google Drive не подключён. Подключаем...")
        drive.mount('/content/drive')
    root_folder = '/content/drive/MyDrive/';
    real_output_folder = '/content/drive/MyDrive/real_output'
    real_input_folder = "/content/drive/MyDrive/real_input"
elif output_folder == "root":
    root_folder = '/content/';
    real_output_folder = '/content/real_output'
    real_input_folder = "/content/real_input"

if not os.path.exists(real_output_folder):
    os.makedirs(real_output_folder)

if not os.path.exists(real_input_folder):
    os.makedirs(real_input_folder)

#clear folders
clear_input_folder = False #@param {type:"boolean"}
up_to_frame = "" #@param {type:"string"}
from_frame = "" #@param {type:"string"}

def clean_folder(folder_path, up_to=None, from_frame=None):
    print(f"\nCurrent parameters:")
    print(f"Delete frames up to: {up_to if up_to else 'not specified'}")
    print(f"Delete frames after: {from_frame if from_frame else 'not specified'}")

    if not os.path.isdir(folder_path):
        print(f"\nFolder {folder_path} does not exist!")
        print("Creating a new folder...")
        os.makedirs(folder_path)
        return

    if not up_to and not from_frame:
        print("\nNo parameters specified - deleting all folder content...")
        shutil.rmtree(folder_path)
        os.makedirs(folder_path)
        print(f"Folder {folder_path} cleared and recreated")
        return

    print("\nStarting file processing...")
    files = os.listdir(folder_path)
    jpg_files = [f for f in files if f.endswith('.jpg')]

    if not jpg_files:
        print("No JPG files to process in the folder")
        return

    deleted_count = 0
    processed_count = 0

    for filename in jpg_files:
        try:
            frame_number = int(filename.split('.')[0])
            should_delete = False

            if up_to and from_frame:
                if frame_number < int(up_to) or frame_number > int(from_frame):
                    should_delete = True
            elif up_to:
                if frame_number < int(up_to):
                    should_delete = True
            elif from_frame:
                if frame_number > int(from_frame):
                    should_delete = True

            if should_delete:
                file_path = os.path.join(folder_path, filename)
                os.remove(file_path)
                deleted_count += 1
                if deleted_count <= 5:
                    print(f'File deleted: {filename}')
                elif deleted_count == 6:
                    print('...')
            else:
                processed_count += 1

        except ValueError:
            print(f'Skipped file with invalid name: {filename}')

    print(f'\nProcessing complete:')
    print(f'Total files: {len(jpg_files)}')
    print(f'Files deleted: {deleted_count}')
    print(f'Files retained: {processed_count}')

if clear_input_folder:
    up_to_frame = up_to_frame if up_to_frame != "0" else None
    from_frame = from_frame if from_frame != "0" else None
    clean_folder(real_input_folder, up_to_frame, from_frame)

clear_output_folder = False #@param {type:"boolean"}

if clear_output_folder:
    if os.path.isdir(real_output_folder):
        shutil.rmtree(real_output_folder)
    os.makedirs(real_output_folder)

In [None]:
#@title ##**Run sequence** { display-mode: "form" }
import cv2
import imageio
import os
import tqdm
import subprocess
import numpy as np
import time

library = "ffmpeg" #@param ["cv2","imageio","ffmpeg","skvideo","scipy","moviepy"]
delay = "0" #@param [0, 0.01, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5]


if (library == "ffmpeg"):
    !pip install ffmpeg-python
    import ffmpeg
    path = root_folder
    full_path = os.path.join(path, file_name)

    probe = ffmpeg.probe(full_path)
    video_info = next(stream for stream in probe['streams'] if stream['codec_type'] == 'video')
    fps = video_info['r_frame_rate']
    duration = float(video_info['duration'])
    frame_count = int(video_info['nb_frames'])

    print("FPS: ", fps)
    print("Duration: ", duration)
    print("Frames: ", frame_count)

    pbar_ffmpeg = tqdm.tqdm(total=frame_count, ncols=100, position=0, leave=True)
    process = (
        ffmpeg
        .input(full_path)
        .output('pipe:', format='rawvideo', pix_fmt='rgb24', qscale=0)
        .run_async(pipe_stdout=True)
    )

    for i in range(frame_count):
        try:
            raw_video = process.stdout.read(video_info['width'] * video_info['height'] * 3)
            frame = np.frombuffer(raw_video, dtype='uint8').reshape((video_info['height'], video_info['width'], 3))
            frame_path = f"{real_input_folder}/{i:09d}.jpg"
            if os.path.isfile(frame_path):
              pbar_ffmpeg.update(1)
              continue
            imageio.imwrite(frame_path, frame)
        except Exception as e:
            print(f"Error writing to disk: {str(e)}. Retrying...")
            continue
        pbar_ffmpeg.update(1)
        time.sleep(float(delay))

    pbar_ffmpeg.close()
    process.wait()

#check
import os

def check_frames():
    frame_dir = real_input_folder
    frames = [int(f.split('.')[0].replace('frame', '')) for f in os.listdir(frame_dir) if f.endswith('.jpg')]
    min_frame = min(frames)
    max_frame = max(frames)
    print(min_frame)
    print(max_frame)

    missing_frames = []
    for i in range(min_frame, max_frame+1):
        if i not in frames:
            missing_frames.append(i)

    if len(missing_frames) > 0:
        print(f"Missing frames: {missing_frames}")
    else:
        print("All frames present")

attempts = 0
max_attempts = 10

while attempts < max_attempts:
    try:
        check_frames()
        break
    except Exception as e:
        attempts += 1
        print(f"Attempt {attempts} failed with error: {str(e)}")
        if attempts == max_attempts:
            print("Maximum attempts reached. Execution failed.")
        else:
            print("Retrying...")


In [None]:
#@title ##**Run enhance sharpness** { display-mode: "form" }
import shutil
from tqdm import tqdm
import os
import re
import cv2
import numpy as np
import matplotlib.pyplot as plt
import glob
from PIL import Image, ImageEnhance
import gc

gc.collect()

def check_frames():
    frame_dir = real_input_folder
    frames = [int(f.split('.')[0].replace('frame', '')) for f in os.listdir(frame_dir) if f.endswith('.jpg')]
    min_frame = min(frames)
    max_frame = max(frames)
    print(min_frame)
    print(max_frame)

    missing_frames = []
    for i in range(min_frame, max_frame+1):
        if i not in frames:
            missing_frames.append(i)

    if len(missing_frames) > 0:
        print(f"Missing frames: {missing_frames}")
    else:
        print("All frames present")

attempts = 0
max_attempts = 10

while attempts < max_attempts:
    try:
        check_frames()
        break
    except Exception as e:
        attempts += 1
        print(f"Attempt {attempts} failed with error: {str(e)}")
        if attempts == max_attempts:
            print("Maximum attempts reached. Execution failed.")
        else:
            print("Retrying...")

def enhance_sharpness(image, sharpness_factor=7.0, upscale_option="No"):
    original_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    enhancer = ImageEnhance.Sharpness(original_image)
    processed_image = enhancer.enhance(sharpness_factor)

    if upscale_option != "No":
        width, height = processed_image.size
        if upscale_option == "height 720px":
            new_height = 720
            new_width = int(width * (new_height / height))
        elif upscale_option == "height 1080px":
            new_height = 1080
            new_width = int(width * (new_height / height))
        elif upscale_option == "height 2160px":
            new_height = 2160
            new_width = int(width * (new_height / height))
        elif upscale_option == "2x":
            new_width = width * 2
            new_height = height * 2
        elif upscale_option == "4x":
            new_width = width * 4
            new_height = height * 4
        processed_image = processed_image.resize((new_width, new_height), Image.LANCZOS)

    return cv2.cvtColor(np.array(processed_image), cv2.COLOR_RGB2BGR)


file_list = os.listdir(real_input_folder)
file_list.sort()
frames = [int(f.split('.')[0].replace('', '')) for f in file_list if f.endswith('.jpg')]
min_frame = min(frames)

real_files = os.listdir(real_output_folder)
if real_files:
    real_frames = [int(re.findall(r'(\d+)\.jpg', f)[0]) for f in real_files if re.match(r'\d+\.jpg', f)]
    start_frame = max(real_frames) + 1
else:
    start_frame = min_frame

max_frame = frames[-1]
print(f"max frame: {max_frame}")
files_to_process = [f"{real_input_folder}/{frame:09d}.jpg" for frame in range(start_frame, max_frame+1) if f"{frame:09d}.jpg" in file_list]

total_files = len(files_to_process)
batch_size = 10
num_iterations = (total_files + batch_size - 1) // batch_size

print(f"start frame: {start_frame}")
print(f"min frame: {min_frame}")
print(f"total: {total_files}")
print(f"iterations: {num_iterations}")

with tqdm(total=num_iterations) as pbar:
    for i in range(0, total_files, batch_size):
        batch_files = files_to_process[i:i+batch_size]

        for img_path in batch_files:
            img = cv2.imread(img_path)
            if img is None:
                print(f"Failed to load image: {img_path}")
                continue

            output = enhance_sharpness(img, sharpness_factor, upscale_option)

            imgname = os.path.basename(img_path)
            save_path = os.path.join(real_output_folder, imgname)
            cv2.imwrite(save_path, output, [cv2.IMWRITE_JPEG_QUALITY, jpeg_quality])

        pbar.update(1)

print(f"Processing completed. Results saved in {real_output_folder}")

def check_frames():
    frame_dir = real_output_folder
    frames = [int(f.split('.')[0].replace('frame', '')) for f in os.listdir(frame_dir) if f.endswith('.jpg')]
    min_frame = min(frames)
    max_frame = max(frames)
    print(min_frame)
    print(max_frame)

    missing_frames = []
    for i in range(min_frame, max_frame+1):
        if i not in frames:
            missing_frames.append(i)

    if len(missing_frames) > 0:
        print(f"Missing frames: {missing_frames}")
    else:
        print("All frames present")

attempts = 0
max_attempts = 10

while attempts < max_attempts:
    try:
        check_frames()
        break
    except Exception as e:
        attempts += 1
        print(f"Attempt {attempts} failed with error: {str(e)}")
        if attempts == max_attempts:
            print("Maximum attempts reached. Execution failed.")
        else:
            print("Retrying...")

In [None]:
#@title ##**Create video** { display-mode: "form" }
import cv2
import os
import subprocess
import time
from tqdm.notebook import tqdm
import torch
import gc

gc.collect()


def log_time(start, message):
    elapsed = time.time() - start
    print(f"{message}: {elapsed:.2f} seconds")
    return time.time()

start_time = time.time()

upscaled_image = 100 #@param {type:"slider", min:0, max:100, step:1}

print(f"output_folder: {output_folder}")

if 'file_name' in locals() and os.path.exists(file_name):
    base_file_name = os.path.basename(file_name)
else:
    raise ValueError("file_name is not defined or the file does not exist")

if output_folder == "google_drive":
    save_path = '/content/drive/MyDrive/'
elif output_folder == "root":
    save_path = '/content/'
else:
    save_path = '/content/'

full_path = os.path.join(save_path, base_file_name) if not os.path.exists(file_name) else file_name
output_file_name = base_file_name.rsplit('.', 1)[0] + f'_sharpness_{upscaled_image}.mp4'
output_file = os.path.join(save_path, output_file_name)
temp_video = "/content/temp_video.mp4"

start_time = log_time(start_time, "Initial setup")

cap = cv2.VideoCapture(full_path)
fps_of_video = int(cap.get(cv2.CAP_PROP_FPS))
cap.release()

upscaled_img_files = [os.path.join(real_output_folder, img) for img in os.listdir(real_output_folder) if img.lower().endswith(('.png', '.jpg', '.jpeg'))]
upscaled_img_files.sort()

if upscaled_image < 100:
    original_img_files = [os.path.join(real_input_folder, img) for img in os.listdir(real_input_folder) if img.lower().endswith(('.png', '.jpg', '.jpeg'))]
    original_img_files.sort()
    if len(upscaled_img_files) != len(original_img_files):
        raise ValueError("Number of upscaled and original frames does not match")

if upscaled_img_files:
    first_frame = cv2.imread(upscaled_img_files[0], cv2.IMREAD_COLOR)
    height, width = first_frame.shape[:2]

    needs_resize = False
    for img in upscaled_img_files[:10]:
        frame = cv2.imread(img, cv2.IMREAD_COLOR)
        if frame.shape[:2] != (height, width):
            needs_resize = True
            break
        del frame
    del first_frame
else:
    raise ValueError("No images found in the upscaled frames folder")

start_time = log_time(start_time, "Frame list preparation")

def get_video_bitrate(file_path):
    cmd = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=bit_rate', '-of', 'default=noprint_wrappers=1:nokey=1', file_path]
    result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
    bitrate = result.stdout.strip()
    try:
        return int(bitrate)
    except ValueError:
        return None

bitrate = get_video_bitrate(full_path)
if bitrate:
    bitrate = int(bitrate * 1.5)
    bitrate_str = f'{bitrate // 1000}k'
else:
    bitrate_str = '7500k'

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(temp_video, fourcc, fps_of_video, (width, height))

if upscaled_image == 100:
    for img_file in tqdm(upscaled_img_files, desc="Processing frames"):
        frame = cv2.imread(img_file, cv2.IMREAD_COLOR)
        if needs_resize and frame.shape[:2] != (height, width):
            frame = cv2.resize(frame, (width, height))
        out.write(frame)
        del frame
else:
    alpha = upscaled_image / 100.0
    beta = 1 - alpha
    for upscaled_img, original_img in tqdm(zip(upscaled_img_files, original_img_files), total=len(upscaled_img_files), desc="Processing frames"):
        upscaled_frame = cv2.imread(upscaled_img, cv2.IMREAD_COLOR)
        original_frame = cv2.imread(original_img, cv2.IMREAD_COLOR)

        if needs_resize and upscaled_frame.shape[:2] != (height, width):
            upscaled_frame = cv2.resize(upscaled_frame, (width, height))

        original_frame_resized = cv2.resize(original_frame, (width, height))

        blended_frame = cv2.addWeighted(upscaled_frame, alpha, original_frame_resized, beta, 0)

        out.write(blended_frame)
        del upscaled_frame, original_frame, original_frame_resized, blended_frame

out.release()
gc.collect()

start_time = log_time(start_time, "Frame processing and writing")


temp_converted = "/content/temp_converted.mp4"
cmd = ['ffmpeg', '-i', temp_video, '-c:v', 'libx264', '-b:v', bitrate_str, '-y', temp_converted]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
    print(f"FFmpeg conversion failed: {result.stderr}")
    raise RuntimeError("Conversion to libx264 failed")
os.remove(temp_video)
os.rename(temp_converted, temp_video)

start_time = log_time(start_time, "FFmpeg conversion to libx264")

cmd = ['ffmpeg', '-i', temp_video, '-i', full_path, '-map', '0:v', '-map', '1:a?', '-map', '1:s?', '-c', 'copy', '-y', output_file]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
    print(f"FFmpeg audio muxing failed: {result.stderr}")
    raise RuntimeError("Audio muxing failed")

start_time = log_time(start_time, "Final audio and subtitles muxing")

if os.path.exists(output_file):
    if os.path.exists(temp_video):
        os.remove(temp_video)
    print("Video created successfully")
    print(f"Video saved at: {output_file}")
else:
    print("Failed to create video")
    print(f"Expected save path: {output_file}")
    print(f"FFmpeg error output: {result.stderr}")

start_time = log_time(start_time, "Cleanup")

In [None]:
#@title ##**Compare videos side by side (optional)** { display-mode: "form" }
import os
from moviepy.editor import VideoFileClip, clips_array
from IPython.display import display, Video

if 'file_name' not in locals() or not os.path.exists(file_name):
    raise ValueError("file_name is not defined or the original video file does not exist")
if 'output_file' not in locals() or not os.path.exists(output_file):
    raise ValueError("output_file is not defined or the processed video file does not exist")

original_video_path = file_name
processed_video_path = output_file

original_clip = VideoFileClip(original_video_path)
processed_clip = VideoFileClip(processed_video_path)

min_duration = min(original_clip.duration, processed_clip.duration)
original_clip = original_clip.subclip(0, min_duration)
processed_clip = processed_clip.subclip(0, min_duration)

if original_clip.size != processed_clip.size:
    target_size = (max(original_clip.w, processed_clip.w), max(original_clip.h, processed_clip.h))
    original_clip = original_clip.resize(target_size)
    processed_clip = processed_clip.resize(target_size)

final_clip = clips_array([[original_clip, processed_clip]])

save_path = os.path.dirname(output_file)
comparison_file_name = os.path.basename(file_name).rsplit('.', 1)[0] + '_comparison.mp4'
comparison_file = os.path.join(save_path, comparison_file_name)

final_clip.write_videofile(comparison_file, codec='libx264', audio=True, fps=original_clip.fps)

if os.path.exists(comparison_file):
    print(f"Comparison video saved at: {comparison_file}")
    display(Video(comparison_file, embed=True, width=800))
else:
    print("Failed to create comparison video")

In [None]:
#@title ##**Download video** { display-mode: "form" }
import cv2
from google.colab import files
import os
import time
from tqdm.notebook import tqdm

if output_folder == "google_drive":
    path = '/content/drive/MyDrive/'
elif output_folder == "root":
    path = '/content/'

output_file_name = file_name.rsplit('.', 1)[0] + '_upscale.mp4'
output_file = os.path.join(path, output_file_name)

if not os.path.exists(output_file):
    raise FileNotFoundError(f"Video file {output_file} not found. Make sure the video creation cell was executed successfully.")

with tqdm(total=1, desc="Downloading video") as pbar:
    files.download(output_file)

    while os.path.exists(output_file) and not os.path.getsize(output_file) == 0:
        time.sleep(0.5)
    pbar.update(1)

print("Download completed")
