In [1]:
import torch
from torchvision.io.video import read_video
from torchvision.models.optical_flow import raft_large, Raft_Large_Weights, raft_small, Raft_Small_Weights
from torchvision.transforms.functional import resize
from torchvision.utils import flow_to_image
import matplotlib.pyplot as plt
import cv2
from pathlib import Path
import numpy as np

### Display video file

In [23]:
dataset_path = Path('H:/Datasets/Celeb_DF/YouTube-real')
video_path = "00000.mp4"
data_path = dataset_path / video_path
# data_path = "H:\Datasets\Celeb_DF\YouTube-real\00273.mp4"

print(data_path)
# Load the video using torchvision
vid, _, _ = read_video(data_path, output_format="TCHW", pts_unit='sec')  # TCHW: Time, Channels, Height, Width
# vid = vid[:32]  # Optionally, shorten the duration if needed (e.g., first 32 frames)
print(vid.shape)

# Display the optical flow image using OpenCV
for frame in vid:
    fr = cv2.cvtColor(frame.numpy().transpose(1, 2, 0), cv2.COLOR_BGR2RGB)
    # print(fr.shape) # 500,892,3
    cv2.imshow('input', fr)
    # Wait for 25ms before moving to the next frame
    if cv2.waitKey(25) & 0xFF == ord('q'):
        break  # Exit the loop if 'q' is pressed

# Release the video window
cv2.destroyAllWindows()

H:\Datasets\Celeb_DF\YouTube-real\00000.mp4
torch.Size([450, 3, 500, 892])


### Load Raft model

In [2]:
# Load the RAFT model with pre-trained weights
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = raft_large(weights=Raft_Large_Weights.DEFAULT).to(device)
model.eval()

RAFT(
  (feature_encoder): FeatureEncoder(
    (convnormrelu): Conv2dNormActivation(
      (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3))
      (1): InstanceNorm2d(64, eps=1e-05, momentum=0.1, affine=False, track_running_stats=False)
      (2): ReLU(inplace=True)
    )
    (layer1): Sequential(
      (0): ResidualBlock(
        (convnormrelu1): Conv2dNormActivation(
          (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
          (1): InstanceNorm2d(64, eps=1e-05, momentum=0.1, affine=False, track_running_stats=False)
          (2): ReLU(inplace=True)
        )
        (convnormrelu2): Conv2dNormActivation(
          (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
          (1): InstanceNorm2d(64, eps=1e-05, momentum=0.1, affine=False, track_running_stats=False)
          (2): ReLU(inplace=True)
        )
        (downsample): Identity()
        (relu): ReLU(inplace=True)
      )
      (1): ResidualBlock(
        (

In [2]:
# # Load the RAFT model with pre-trained weights
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# model = raft_small(weights=Raft_Small_Weights.DEFAULT).to(device)
# model.eval()

RAFT(
  (feature_encoder): FeatureEncoder(
    (convnormrelu): Conv2dNormActivation(
      (0): Conv2d(3, 32, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3))
      (1): InstanceNorm2d(32, eps=1e-05, momentum=0.1, affine=False, track_running_stats=False)
      (2): ReLU(inplace=True)
    )
    (layer1): Sequential(
      (0): BottleneckBlock(
        (convnormrelu1): Conv2dNormActivation(
          (0): Conv2d(32, 8, kernel_size=(1, 1), stride=(1, 1))
          (1): InstanceNorm2d(8, eps=1e-05, momentum=0.1, affine=False, track_running_stats=False)
          (2): ReLU(inplace=True)
        )
        (convnormrelu2): Conv2dNormActivation(
          (0): Conv2d(8, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
          (1): InstanceNorm2d(8, eps=1e-05, momentum=0.1, affine=False, track_running_stats=False)
          (2): ReLU(inplace=True)
        )
        (convnormrelu3): Conv2dNormActivation(
          (0): Conv2d(8, 32, kernel_size=(1, 1), stride=(1, 1))
          (1): Inst

In [3]:
# Preprocessing function (based on RAFT's input expectations)
def preprocess_frame(frame, device):
    frame_tensor = frame.float() / 255.0  # Normalize to [0, 1]
    frame_resized = resize(frame_tensor, size=[496, 888])  # Resize to match RAFT expectations 500, 892 ->  496, 888
    return frame_resized.unsqueeze(0).to(device)  # Add batch dimension and move to device

In [5]:
dataset_path = Path('H:/Datasets/Celeb_DF/YouTube-real')
video_path = "00000.mp4"
data_path = dataset_path / video_path
# data_path = "H:\Datasets\Celeb_DF\YouTube-real\00273.mp4"

# Load the video using torchvision
vid, _, _ = read_video(data_path, output_format="TCHW", pts_unit='sec')  # TCHW: Time, Channels, Height, Width

# Initialize video writer (if needed to save output)
fps = 30  # Change this according to the input video FPS
fourcc = cv2.VideoWriter_fourcc(*'MJPG')
out = cv2.VideoWriter('output_flow.avi', fourcc, fps, (888, 496))

# Process the video frame by frame
for i in range(vid.shape[0] - 1):
    prev_frame = vid[i]
    curr_frame = vid[i + 1]

    # Preprocess both frames
    prev_frame_tensor = preprocess_frame(prev_frame, device)
    curr_frame_tensor = preprocess_frame(curr_frame, device)
    # print(prev_frame_tensor.shape) #[1, 3, 496, 888]

    # Predict optical flow between the previous and current frames
    with torch.no_grad():
        flow_list = model(prev_frame_tensor, curr_frame_tensor)

        # flow_np = []
        # for flow in flow_list:
        #     flow_np.append(flow.cpu().numpy()) # Move tensor to CPU before converting to NumPy

        # print(np.array(flow_np).shape)  # (12, 1, 2, 496, 888)

        predicted_flow = flow_list[-1]  # Use the final output of the model
        # print(np.array(predicted_flow.cpu()).shape) # (1, 2, 496, 888)

    # Convert optical flow to an RGB image for visualizat ion
    flow_image = flow_to_image(predicted_flow.squeeze(0)).cpu().numpy().transpose(1, 2, 0) #(496, 888, 3)
    flow_image_bgr = cv2.cvtColor(flow_image.astype(np.uint8), cv2.COLOR_RGB2BGR)  # Convert to BGR for OpenCV

    # cv2.imshow('input', flow_image_bgr)
    # # Wait for 25ms before moving to the next frame
    # if cv2.waitKey(25) & 0xFF == ord('q'):
    #     break  # Exit the loop if 'q' is pressed

    # Save the flow image as part of the output video
    out.write(flow_image_bgr)

# Release the video writer
out.release()
cv2.destroyAllWindows()

In [12]:
# Load the video using torchvision
dataset_path = Path('H:/Datasets/Celeb_DF/YouTube-real')
video_path = "00000.mp4"
data_path = dataset_path / video_path
vid, _, _ = read_video(data_path, output_format="TCHW", pts_unit='sec')  # TCHW: Time, Channels, Height, Width

# Initialize video writer (if needed to save output)
fps = 30  # Change this according to the input video FPS
fourcc = cv2.VideoWriter_fourcc(*'MJPG') #h264 MJPG avc1 mpv4
out = cv2.VideoWriter('output_flow2.avi', fourcc, fps, (888, 496))

# Parameters
batch_size = 6  # Number of frames to process in a batch
num_frames = vid.shape[0]

# Process the video frame by frame in batches
for i in range(0, num_frames - 1, batch_size):
    # Prepare batch of frames
    batch_prev_frames = []
    batch_curr_frames = []
    
    for j in range(batch_size):
        if i + j < num_frames - 1:
            batch_prev_frames.append(vid[i + j])
            batch_curr_frames.append(vid[i + j + 1])

    # Preprocess batches
    batch_prev_tensor = torch.cat([preprocess_frame(f, device) for f in batch_prev_frames], dim=0)
    batch_curr_tensor = torch.cat([preprocess_frame(f, device) for f in batch_curr_frames], dim=0)

    # Predict optical flow between the previous and current frames
    with torch.no_grad():
        flow_list = model(batch_prev_tensor, batch_curr_tensor)

        predicted_flows = flow_list[-1]  # Use the final output of the model

    # Convert optical flows to RGB images for visualization
    for flow in predicted_flows:
        flow_image = flow_to_image(flow.squeeze(0)).cpu().numpy().transpose(1, 2, 0)  # (496, 888, 3)
        flow_image_bgr = cv2.cvtColor(flow_image.astype(np.uint8), cv2.COLOR_RGB2BGR)  # Convert to BGR for OpenCV

        # Save the flow image as part of the output video
        out.write(flow_image_bgr)

# Release the video writer
out.release()
cv2.destroyAllWindows()

In [14]:
import av

def compress_video(input_file, output_file, target_bitrate=500000):
    # Open the input file
    input_container = av.open(input_file)
    
    # Create an output container to write the compressed video
    output_container = av.open(output_file, mode='w')

    # Set codec for the output video stream (e.g., H.264)
    output_stream = output_container.add_stream('h264', rate=30)  # 30 fps

    # Set the bitrate for compression
    output_stream.bit_rate = target_bitrate  # Bitrate in bits per second

    for frame in input_container.decode(video=0):
        # Re-encode the frames to the output video stream
        packet = output_stream.encode(frame)
        if packet:
            output_container.mux(packet)

    # Flush any remaining packets
    output_container.mux(output_stream.encode())
    
    # Close the containers
    input_container.close()
    output_container.close()

    print(f"Video compressed successfully to: {output_file}")

# Example usage
input_video = 'output_flow2.mp4'
output_video = 'output_flow_compressed2.mp4'

compress_video(input_video, output_video, target_bitrate=1500000)

Video compressed successfully to: output_flow_compressed2.mp4
