In [1]:
%pip install -q "openvino>=2023.1.0"
%pip install -q opencv-python
%pip install -q "pytube>=12.1.0"

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [1]:
import time
from pathlib import Path

import cv2
import numpy as np
from IPython.display import (
    HTML,
    FileLink,
    Pretty,
    ProgressBar,
    Video,
    clear_output,
    display,
)
import openvino as ov
from pytube import YouTube

In [2]:
def download_file(url: str, path: Path) -> None:
    """Download file."""
    import urllib.request
    path.parent.mkdir(parents=True, exist_ok=True)
    urllib.request.urlretrieve(url, path)

In [3]:
import ipywidgets as widgets

core = ov.Core()
device = widgets.Dropdown(
    options=core.available_devices + ["AUTO"],
    value='AUTO',
    description='Device:',
    disabled=False,
)

device

Dropdown(description='Device:', index=2, options=('CPU', 'GPU', 'AUTO'), value='AUTO')

In [4]:
model_name = 'single-image-super-resolution-1032'

base_model_dir = Path('./model').expanduser()

model_xml_name = f'{model_name}.xml'
model_bin_name = f'{model_name}.bin'

model_xml_path = base_model_dir / model_xml_name
model_bin_path = base_model_dir / model_bin_name

if not model_xml_path.exists():
    base_url = f'https://storage.openvinotoolkit.org/repositories/open_model_zoo/2023.0/models_bin/1/{model_name}/FP16/'
    model_xml_url = base_url + model_xml_name
    model_bin_url = base_url + model_bin_name

    download_file(model_xml_url, model_xml_path)
    download_file(model_bin_url, model_bin_path)
else:
    print(f'{model_name} already downloaded to {base_model_dir}')

In [5]:
def convert_result_to_image(result) -> np.ndarray:
    result = result.squeeze(0).transpose(1, 2, 0)
    result *= 255
    result[result < 0] = 0
    result[result > 255] = 255
    result = result.astype(np.uint8)
    return result

In [6]:
core = ov.Core()
model = core.read_model(model=model_xml_path)
compiled_model = core.compile_model(model=model, device_name=device.value)

In [7]:
original_image_key, bicubic_image_key = compiled_model.inputs
output_key = compiled_model.output(0)

input_height, input_width = list(original_image_key.shape)[2:]
target_height, target_width = list(bicubic_image_key.shape)[2:]

upsample_factor = int(target_height / input_height)

print(f"The network expects inputs with a width of {input_width}, " f"height of {input_height}")
print(f"The network returns images with a width of {target_width}, " f"height of {target_height}")

print(
    f"The image sides are upsampled by a factor of {upsample_factor}. "
    f"The new image is {upsample_factor**2} times as large as the "
    "original image"
)

The network expects inputs with a width of 480, height of 270
The network returns images with a width of 1920, height of 1080
The image sides are upsampled by a factor of 4. The new image is 16 times as large as the original image


In [8]:
OUTPUT_DIR = "output"

Path(OUTPUT_DIR).mkdir(exist_ok=True)
NUM_FRAMES = 100
FOURCC = cv2.VideoWriter_fourcc(*"vp09")

In [9]:
VIDEO_URL = "https://www.youtube.com/watch?v=V8yS3WIkOrA"
yt = YouTube(VIDEO_URL)

stream = yt.streams.filter(resolution="360p").first()
filename = Path(stream.default_filename.encode("ascii", "ignore").decode("ascii")).stem
stream.download(output_path=OUTPUT_DIR, filename=filename)
print(f"Video {filename} downloaded to {OUTPUT_DIR}")

video_path = Path(stream.get_file_path(filename, OUTPUT_DIR))

superres_video_path = Path(f"{OUTPUT_DIR}/{video_path.stem}_superres.mp4")
bicubic_video_path = Path(f"{OUTPUT_DIR}/{video_path.stem}_bicubic.mp4")
comparison_video_path = Path(f"{OUTPUT_DIR}/{video_path.stem}_superres_comparison.mp4")

Video Leading Intel with CEO Pat Gelsinger downloaded to output


In [10]:
cap = cv2.VideoCapture(filename=str(video_path))
ret, image = cap.read()
if not ret:
    raise ValueError(f"The video at '{video_path}' cannot be read.")
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)

if NUM_FRAMES == 0:
    total_frames = frame_count
else:
    total_frames = min(frame_count, NUM_FRAMES)

original_frame_height, original_frame_width = image.shape[:2]

cap.release()
print(
    f"The input video has a frame width of {original_frame_width}, "
    f"frame height of {original_frame_height} and runs at {fps:.2f} fps"
)

The input video has a frame width of 640, frame height of 360 and runs at 29.97 fps


In [11]:
superres_video = cv2.VideoWriter(
    filename=str(superres_video_path),
    fourcc=FOURCC,
    fps=fps,
    frameSize=(target_width, target_height),
)

bicubic_video = cv2.VideoWriter(
    filename=str(bicubic_video_path),
    fourcc=FOURCC,
    fps=fps,
    frameSize=(target_width, target_height),
)

comparison_video = cv2.VideoWriter(
    filename=str(comparison_video_path),
    fourcc=FOURCC,
    fps=fps,
    frameSize=(target_width * 2, target_height),
)

In [12]:
start_time = time.perf_counter()
frame_nr = 0
total_inference_duration = 0

progress_bar = ProgressBar(total=total_frames)
progress_bar.display()

cap = cv2.VideoCapture(filename=str(video_path))

try:
    while cap.isOpened():
        ret, image = cap.read()
        if not ret:
            cap.release()
            break

        if frame_nr >= total_frames:
            break

        resized_image = cv2.resize(src=image, dsize=(input_width, input_height))
        input_image_original = np.expand_dims(resized_image.transpose(2, 0, 1), axis=0)

        bicubic_image = cv2.resize(
            src=image, dsize=(target_width, target_height), interpolation=cv2.INTER_CUBIC
        )
        
        input_image_bicubic = np.expand_dims(bicubic_image.transpose(2, 0, 1), axis=0)

        inference_start_time = time.perf_counter()
        result = compiled_model(
            {
                original_image_key.any_name: input_image_original,
                bicubic_image_key.any_name: input_image_bicubic,
            }
        )[output_key]
        inference_stop_time = time.perf_counter()
        inference_duration = inference_stop_time - inference_start_time
        total_inference_duration += inference_duration

        result_frame = convert_result_to_image(result=result)

        superres_video.write(image=result_frame)
        bicubic_video.write(image=bicubic_image)

        stacked_frame = np.hstack((bicubic_image, result_frame))
        comparison_video.write(image=stacked_frame)

        frame_nr = frame_nr + 1

        progress_bar.progress = frame_nr
        progress_bar.update()
        if frame_nr % 10 == 0 or frame_nr == total_frames:
            clear_output(wait=True)
            progress_bar.display()
            display(
                Pretty(
                    f"Processed frame {frame_nr}. Inference time: "
                    f"{inference_duration:.2f} seconds "
                    f"({1/inference_duration:.2f} FPS)"
                )
            )

except KeyboardInterrupt:
    print("Processing interrupted.")
finally:
    superres_video.release()
    bicubic_video.release()
    comparison_video.release()
    end_time = time.perf_counter()
    duration = end_time - start_time
    print(f"Video's saved to {comparison_video_path.parent} directory.")
    print(
        f"Processed {frame_nr} frames in {duration:.2f} seconds. Total FPS "
        f"(including video processing): {frame_nr/duration:.2f}. "
        f"Inference FPS: {frame_nr/total_inference_duration:.2f}."
    )

Processed frame 100. Inference time: 0.17 seconds (5.80 FPS)

Video's saved to output directory.
Processed 100 frames in 229.46 seconds. Total FPS (including video processing): 0.44. Inference FPS: 5.07.


In [13]:
if not comparison_video_path.exists():
    raise ValueError("The comparison video does not exist.")
else:
    video_link = FileLink(comparison_video_path)
    video_link.html_link_str = "<a href='%s' download>%s</a>"
    display(
        HTML(
            f"Showing side by side comparison. If you cannot see the video in "
            "your browser, please click on the following link to download "
            f"the video<br>{video_link._repr_html_()}"
        )
    )
    display(Video(comparison_video_path, width=800, embed=True))