E/19/039 <br>
N.P.M.P.Bandara <br>


Notebook With Outputs : https://colab.research.google.com/drive/1PwodgR__AOVB3AMuKRgN9DJx58Cno-AP?usp=sharing

# Imports

In [None]:
import os
import sys
import time
import numpy as np
import cv2
import matplotlib.pyplot as plt
from IPython.display import display, HTML

In [None]:
%pip install ffmpeg-python
import ffmpeg

# Task 1 :Generating H.264 bit stream

In [None]:
import ffmpeg

# Define input and output video paths
input_video_path = "/content/Switzerland.mp4"
output_video_path = "/content/Switzerland_Output.h264"

# Set encoding parameters
bitrate = 2000
crf = 23
gop = 9
b_frames = 3

# Create an FFmpeg input stream
input_stream = ffmpeg.input(input_video_path)

# Set the output codec to H.264 with the specified parameters
output_stream = ffmpeg.output(
    input_stream,
    output_video_path,
    pix_fmt='yuv420p',
    vcodec='libx264',
    bitrate=bitrate,
    crf=crf,
    g=gop,
    bf=b_frames
)

# Run the FFmpeg command
ffmpeg.run(output_stream)


In [None]:
import ffmpeg

# Define input and output file paths
input_path = "/content/Switzerland_Output.h264"
output_video_path_mp4 = "/content/Switzerland_Output.mp4"

# Create an FFmpeg decoder object
decoder = ffmpeg.input(input_path)

# Define output format and codec
output_stream = decoder.output(
    output_video_path_mp4,
    pix_fmt="yuv420p",
    vcodec="libx264"
)

# Print the generated FFmpeg command (for debugging)
print("FFmpeg command:", " ".join(ffmpeg.compile(output_stream)))

# Run the decoding process
out, err = ffmpeg.run(output_stream, capture_stdout=True, capture_stderr=True)

# Print errors if any
if err:
    print("FFmpeg Error:", err.decode("utf-8"))


In [None]:
frame_height, frame_width = 1080, 1920
frame_size = frame_width * frame_height * 3

## Extract I Frames

In [None]:
# Ensure the output directory exists
os.makedirs(output_frames_folder, exist_ok=True)

# FFmpeg command to extract only I-frames as images
ffmpeg.input(output_video_path_mp4).output(
    os.path.join(output_frames_folder, "frame_%04d.png"),
    vf="select='eq(pict_type,PICT_TYPE_I)'",  # Select only I-frames
    vsync="vfr"
).run()

# Get list of extracted I-frames
frame_files = sorted(os.listdir(output_frames_folder))

# Plot the first 5 I-frames
num_frames_to_plot = min(5, len(frame_files))
fig, axes = plt.subplots(1, num_frames_to_plot, figsize=(15, 5))

for i, frame_file in enumerate(frame_files[:num_frames_to_plot]):
    frame_path = os.path.join(output_frames_folder, frame_file)
    img = cv2.imread(frame_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB for correct display

    axes[i].imshow(img)
    axes[i].axis("off")
    axes[i].set_title(f"I-Frame {i+1}")

plt.show()

## Extract B Frames

In [None]:
# Define input video path and output folder for B-frames
output_frames_folder = "/content/B_frames"

# Ensure the output directory exists
os.makedirs(output_frames_folder, exist_ok=True)

# FFmpeg command to extract only B-frames as images
ffmpeg.input(output_video_path_mp4).output(
    os.path.join(output_frames_folder, "frame_%04d.png"),
    vf="select='eq(pict_type,PICT_TYPE_B)'",  # Select only B-frames
    vsync="vfr"
).run()

# Get list of extracted B-frames
frame_files = sorted(os.listdir(output_frames_folder))

# Plot the first 5 B-frames
num_frames_to_plot = min(5, len(frame_files))

if num_frames_to_plot == 0:
    print("No B-frames were found in the video.")
else:
    fig, axes = plt.subplots(1, num_frames_to_plot, figsize=(15, 5))

    for i, frame_file in enumerate(frame_files[:num_frames_to_plot]):
        frame_path = os.path.join(output_frames_folder, frame_file)
        img = cv2.imread(frame_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB for correct display

        axes[i].imshow(img)
        axes[i].axis("off")
        axes[i].set_title(f"B-Frame {i+1}")

    plt.show()


## Extract P frames

In [None]:
# Run FFmpeg command to extract raw P-frames as images (in memory)
process = (
    ffmpeg.input(output_video_path_mp4)
    .filter("select", "eq(pict_type,PICT_TYPE_P)")  # Select only P-frames
    .output("pipe:", format="rawvideo", pix_fmt="rgb24")
    .run(capture_stdout=True, capture_stderr=True)
)

# Extract raw pixel data and determine frame size
probe = ffmpeg.probe(output_video_path_mp4, select_streams="v")
width = int(probe["streams"][0]["width"])
height = int(probe["streams"][0]["height"])
frame_size = width * height * 3

# Convert raw frame data to NumPy images
raw_frames = process[0]
frames = [np.frombuffer(raw_frames[i * frame_size : (i + 1) * frame_size], np.uint8).reshape((height, width, 3))
          for i in range(len(raw_frames) // frame_size)]

# Plot the first 5 P-frames
num_frames_to_plot = min(5, len(frames))

if num_frames_to_plot == 0:
    print("No P-frames were found in the video.")
else:
    fig, axes = plt.subplots(1, num_frames_to_plot, figsize=(15, 5))

    for i, frame in enumerate(frames[:num_frames_to_plot]):
        axes[i].imshow(frame)
        axes[i].axis("off")
        axes[i].set_title(f"P-Frame {i+1}")

    plt.show()


# View the decoded frames

In [None]:
from moviepy.editor import VideoFileClip
from IPython.display import display, Video

# Load the video
clip = VideoFileClip("/content/Switzerland_Output.mp4")

# Display the video in the notebook
display(Video("/content/Switzerland_Output.mp4", embed=True))


# Task 2: Adjusting the Quality of the decoded output

In [None]:
def encode_decode_video(input_video_path, bitrate, crf, gop, b_frames):

    # Generate file names for the output video
    input_file_name = os.path.basename(input_video_path)
    file_name_without_extension = os.path.splitext(input_file_name)[0]

    # Create output paths for .h264 (encoded) and .mp4 (decoded)
    output_h264_path = f"/content/{file_name_without_extension}_encoded_{bitrate}_{crf}_{gop}_{b_frames}.h264"
    output_mp4_path = f"/content/{file_name_without_extension}_decoded_{bitrate}_{crf}_{gop}_{b_frames}.mp4"

    # Encoding to .h264
    ffmpeg.input(input_video_path).output(output_h264_path, vcodec='libx264', bitrate=bitrate, crf=crf, g=gop, bf=b_frames, pix_fmt='yuv420p').run()

    # Decoding from .h264 to .mp4
    ffmpeg.input(output_h264_path).output(output_mp4_path, vcodec='libx264', pix_fmt='yuv420p').run()

    # Print the output file paths
    print(f"Encoded video saved to: {output_h264_path}")
    print(f"Decoded video saved to: {output_mp4_path}")

    return output_h264_path, output_mp4_path

##  [ Bit rate  = 1000 , CRF = 23 ]

In [None]:
output_h264_path, output_mp4_path = encoded_path, decoded_path = encode_decode_video(input_video_path, 1000, 23, 9, 3)
output_mp4_path

In [None]:
# Load the video
clip = VideoFileClip(output_mp4_path)

# Display the video in the notebook
display(Video(output_mp4_path, embed=True))

##  [ Bit rate  = 1000 , CRF = 51 ]

Max CRF is used

In [None]:
output_h264_path, output_mp4_path = encoded_path, decoded_path = encode_decode_video(input_video_path, 1000, 51, 9, 3)
output_mp4_path

In [None]:
# Load the video
clip = VideoFileClip(output_mp4_path)

# Display the video in the notebook
display(Video(output_mp4_path, embed=True))

##  [ Bit rate  = 2500 , CRF = 23 ]

In [None]:
output_h264_path, output_mp4_path = encoded_path, decoded_path = encode_decode_video(input_video_path, 2500, 23, 9, 3)
output_mp4_path

In [None]:
# Load the video
clip = VideoFileClip(output_mp4_path)

# Display the video in the notebook
display(Video(output_mp4_path, embed=True))

##  [ Bit rate  = 4000 , CRF = 23 ]

In [None]:
output_h264_path, output_mp4_path = encoded_path, decoded_path = encode_decode_video(input_video_path, 4000, 23, 9, 3)
output_mp4_path

In [None]:
# Load the video
clip = VideoFileClip(output_mp4_path)

# Display the video in the notebook
display(Video(output_mp4_path, embed=True))

## Observations

When the bitrate is decreased, the video file becomes smaller, but the quality also drops. If the CRF value increased, the quality decreases and the file size reduces further. On the other hand, lowering the CRF improves quality but increases the file size. So, adjusting both controls how big the file is and how good the video looks.

# Frame drop

This code iterates through the bitstream byte-by-byte, searching for NAL unit start codes. It checks the drop rate and optionally the NAL unit type to determine if the unit should be dropped. If dropped, the corresponding bytes are skipped, and the remaining bytes are copied to the modified bitstream.

Remember to adjust the drop rate, targeted NAL unit types, and file paths as needed. This is a basic example, and further customization may be required depending on your specific needs and H.264 bitstream format.

In [None]:
input = output_video_path
output = "/content/output_dropV2.h264"
output_mp4 = "/content/output_dropV2.mp4"

In [None]:
import random

def drop_nal_units(bitstream, drop_rate, nal_unit_types_to_drop=None):
  """
  Simulates NAL unit drop in an H.264 bitstream.

  Args:
    bitstream: Bytes object containing the H.264 bitstream.
    drop_rate: Float between 0 and 1, representing the probability of dropping a NAL unit.
    nal_unit_types_to_drop: Optional list of NAL unit types (integers) to target for dropping.

  Returns:
    Bytes object containing the modified bitstream with dropped NAL units.
  """

  dropped_bytes = 0
  modified_bitstream = bytearray()

  # Iterate over the bitstream byte by byte, searching for NAL unit start codes (0x00 00 01)
  for i in range(0, len(bitstream) - 3):
    if bitstream[i:i+3] == b'\x00\x00\x01':
      # Extract NAL unit type from the next byte
      nal_unit_type = bitstream[i+3]
      print (nal_unit_type, '\n')

      # Check if the unit should be dropped based on drop rate and type (if specified)
      if random.random() < drop_rate and (not nal_unit_types_to_drop or nal_unit_type in nal_unit_types_to_drop):
        dropped_bytes += len(bitstream[i:i+4])
        continue

      # Copy the NAL unit to the modified bitstream
      modified_bitstream.extend(bitstream[i:])
      break

  # Append the remaining bytes
  modified_bitstream.extend(bitstream[i+4:])

  print(f"Dropped {dropped_bytes} bytes, resulting in {len(modified_bitstream)} bytes remaining.")
  return modified_bitstream

In [None]:
# Example usage
bitstream = open(input, "rb").read()
modified_bitstream = drop_nal_units(bitstream, 0.1, [5, 7])  # Drop 10% of NAL units with types 5 and 7
open(output, "wb").write(modified_bitstream)

In [None]:
# Create an FFmpeg input object from the input file
input_video = ffmpeg.input(output)

# Define the output file path
output_path = "/content/outputV3_NOerror.mp4"

# Set the output format and codec
output_video = input_video.output(output_path, pix_fmt="yuv420p", vcodec="libx264")

# Run the encoding process
output_video.run()


# When some NAL Unit types are dropped, Decoder fails.

In [None]:
# Example usage
bitstream = open(input, "rb").read()
modified_bitstream = drop_nal_units(bitstream, 0.5, [103, 104, 105])  # Drop 10% of NAL units with types 5 and 7
open(output, "wb").write(modified_bitstream)

In [None]:
import ffmpeg

# # Define the input file path
# input_path = dropped_frames

# Create an FFmpeg decoder object
decoder_N = ffmpeg.input(output)

# Define output format (optional)
decoder_N_N = decoder_N.output(  "/content/outputV3_error.mp4", pix_fmt="yuv420p", vcodec="libx264" )

# Run the decoding process
decoder_N_N.run()

# Task 3 : Drop only B and P frames at a rate of 20%.

In [None]:
if os.path.exists(os.path.join(main_dir,"output_dropV2.h264")):
    os.remove(
        os.path.join(main_dir,"output_dropV2.h264")
    )



In [None]:
def P_and_B_Frames(n):
    if n % 9 == 0:
        return False
    return True

def drop_nal_units_P_and_B(bitstream, drop_rate):
    dropped_bytes = 0
    modified_bitstream = bytearray()
    nal_types_to_drop = [1, 2]  # Assuming NAL unit types 1 and 2 correspond to P and B frames

    i = 0
    while i < len(bitstream) - 3:
        if bitstream[i:i+3] == b'\x00\x00\x01':
            nal_unit_type = bitstream[i+3] & 0x1F  # Extract NAL unit type
            if nal_unit_type in nal_types_to_drop:
                if random.random() < drop_rate:
                    i += 4
                    dropped_bytes += 4
                    continue
            modified_bitstream.extend(bitstream[i:i+4])
            i += 4
        else:
            modified_bitstream.append(bitstream[i])
            i += 1
    modified_bitstream.extend(bitstream[i:])
    print(f"Dropped {dropped_bytes} bytes, resulting in {len(modified_bitstream)} bytes remaining.")
    return modified_bitstream

In [None]:
output_BP_Dropped ="/content/output_BP_Dropped.h264"

bitstream = open(input, "rb").read()
modified_bitstream = drop_nal_units_P_and_B(bitstream, 0.20)
open(output, "wb").write(modified_bitstream)

In [None]:
decoder_N = ffmpeg.input(output)

# Define output format (optional)
decoder_N = decoder_N.output("/content/output_BP_Dropped.mp4", pix_fmt="yuv420p", vcodec="libx264" )

# Run the decoding process
try:
    decoder_N.run()
except ffmpeg.Error as e:
    print("FFmpeg Error:", e.stderr.decode())

In [None]:
clip = VideoFileClip("output_BP_Dropped.mp4")
clip.display_in_notebook(
    frame_width=400
)