In [1]:
import cv2
import os
import numpy as np

def convert_to_frames_array(video_path, output_folder):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Could not open video file {video_path}")
        return None
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    frames = np.zeros((frame_count, height, width, 3), dtype=np.uint8) 
    print(f"Total frames: {frame_count}, Width: {width}, Height: {height}")
    if output_folder is not None:
      os.makedirs(output_folder, exist_ok=True)
    i = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frames[i] = frame 
        if output_folder:
            cv2.imwrite(os.path.join(output_folder, f"frame_{i:04d}.png"), frame)
        i += 1
        if i % 100 == 0:
            print(f"Processed {i} frames")

    cap.release()
    print("Video converted to frames array successfully!")
    return frames, fps

video_path = 'MyRecording_001.mp4'
output_folder = 'frames'
frames_array, fps = convert_to_frames_array(video_path, output_folder)

Total frames: 451, Width: 1920, Height: 1080
Processed 100 frames
Processed 200 frames
Processed 300 frames
Processed 400 frames
Video converted to frames array successfully!


In [2]:
def rgb_to_gray(rgb_frame):
    red = rgb_frame[:, :, 2] 
    green = rgb_frame[:, :, 1] 
    blue = rgb_frame[:, :, 0]
    gray = (0.2989 * red) + (0.5870 * green) + (0.1140 * blue)
    gray = gray.astype(np.uint8) 
    return gray

frames_C = 1
if frames_array is not None:
    print("Converting to Gray frames")
    gray_frames = []
    for rgb_frame in frames_array:
        frames_C +=1
        if frames_C % 100 == 0:
            print(f"Processed {frames_C} frames")
        gray_frame = rgb_to_gray(rgb_frame)
        gray_frames.append(gray_frame)
    gray_frames = np.array(gray_frames)
    print("Shape of gray_frames:", gray_frames.shape)


Converting to Gray frames
Processed 100 frames
Processed 200 frames
Processed 300 frames
Processed 400 frames
Shape of gray_frames: (451, 1080, 1920)


In [3]:
def frame_diff(frames):
    diff = []
    for i in range(len(frames)-1):
        diff.append(abs(frames[i]-frames[i+1]))
        if(i%100 ==0):
            print(f"Processed {i} frames")

    return diff

Diff = np.array(frame_diff(gray_frames))
print(Diff.shape)

Processed 0 frames
Processed 100 frames
Processed 200 frames
Processed 300 frames
Processed 400 frames
(450, 1080, 1920)


In [4]:
for i, diff_frame in enumerate(Diff):
    output_filename = os.path.join("diffFrames", f"diff_frame_{i:04d}.png") # or .jpg, .bmp, etc.
    cv2.imwrite(output_filename, diff_frame)

In [5]:
import numpy as np

def save_video(frames, output_path, fps=30):
    if len(frames) == 0:
        print("No frames to write.")
        return

    height, width = frames[0].size[1], frames[0].size[0]
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    for frame in frames:
        frame_np = np.array(frame)  # PIL to numpy array
        if frame_np.shape[2] == 4:  # Remove alpha if present
            frame_np = frame_np[:, :, :3]
        out.write(frame_np)  # Convert to BGR

    out.release()
    print(f"✅ Output video saved at: {output_path}")

In [6]:
image_folder =r'diffFrames'

images = [img for img in os.listdir(image_folder) if img.endswith(".png")]
images.sort()
if not images:
    print("No PNG images found in the folder.")
    exit()

first_image = cv2.imread(os.path.join(image_folder, images[0]))
height, width, layers = first_image.shape

output_filename = os.path.join(image_folder, 'output.mp4')
fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Use mp4v codec for .mp4 format

out = cv2.VideoWriter(output_filename, fourcc, 24.0, (width, height))
for image in images:
    img_path = os.path.join(image_folder, image)
    frame = cv2.imread(img_path)
    out.write(frame)

out.release()
cv2.destroyAllWindows()

print(f"Video '{output_filename}' created successfully.")

Video 'diffFrames\output.mp4' created successfully.


In [7]:
def euclidean_dist(p1, p2):
    return np.linalg.norm(np.array(p1) - np.array(p2))


In [8]:
from PIL import Image, ImageDraw
from scipy.ndimage import label, find_objects
import numpy as np

def track_persistent_blobs(frames, diff_frames, min_area=20, min_duration=7, max_dist=25, max_area_diff=0.3):
    blob_tracks = {}  # {id: [ (frame_idx, centroid, area) ]}
    next_id = 0

    prev_blobs = []

    for i in range(len(diff_frames)):
        labeled_array, _ = label(diff_frames[i])
        slices = find_objects(labeled_array)

        current_blobs = []

        for sl in slices:
            y0, x0 = sl[0].start, sl[1].start
            y1, x1 = sl[0].stop,  sl[1].stop

            area = (x1 - x0) * (y1 - y0)
            if area < min_area:
                continue

            centroid = ((y0 + y1) / 2, (x0 + x1) / 2)
            matched = False

            # Try matching with prev frame blobs
            for blob_id, last_blob in prev_blobs:
                dist = euclidean_dist(centroid, last_blob['centroid'])
                area_ratio = abs(area - last_blob['area']) / last_blob['area']

                if dist < max_dist and area_ratio < max_area_diff:
                    blob_tracks[blob_id].append((i, centroid, area))
                    current_blobs.append((blob_id, {"centroid": centroid, "area": area}))
                    matched = True
                    break

            if not matched:
                # New blob
                blob_tracks[next_id] = [(i, centroid, area)]
                current_blobs.append((next_id, {"centroid": centroid, "area": area}))
                next_id += 1

        prev_blobs = current_blobs

    # Filter blobs with enough duration
    persistent_ids = [blob_id for blob_id, track in blob_tracks.items() if len(track) >= min_duration]

    # Draw these on frames
    output_frames = []
    for i in range(len(diff_frames)):
        base = Image.fromarray(frames[i + 1])  # skip first
        draw = ImageDraw.Draw(base)

        for blob_id in persistent_ids:
            track = [step for step in blob_tracks[blob_id] if step[0] == i]
            if track:
                _, centroid, _ = track[0]
                y, x = centroid
                r = 12
                draw.ellipse([x - r, y - r, x + r, y + r], outline=(255, 0, 0), width=2)
                draw.text((x + r + 2, y - r), f"ID {blob_id}", fill=(255, 0, 0))

        output_frames.append(base)

    return output_frames


In [9]:
output_frames = track_persistent_blobs(frames_array, Diff)
save_video(output_frames, "persistent_objects.mp4", fps=30)


✅ Output video saved at: persistent_objects.mp4


In [10]:
from scipy.ndimage import label, find_objects
from collections import defaultdict
import numpy as np

def compute_centroid(sl):
    y0, x0 = sl[0].start, sl[1].start
    y1, x1 = sl[0].stop,  sl[1].stop
    return ((x0 + x1) / 2, (y0 + y1) / 2), (x1 - x0) * (y1 - y0)

def track_objects_across_frames(diff_frames, max_dist=30, size_tol=0.4, min_frames=7):
    object_tracks = defaultdict(list)  # id -> list of (frame_index, (x,y), area)
    next_id = 0

    prev_objects = []

    for frame_idx, diff in enumerate(diff_frames):
        labeled_array, _ = label(diff)
        slices = find_objects(labeled_array)

        curr_objects = []
        for sl in slices:
            centroid, area = compute_centroid(sl)
            curr_objects.append((centroid, area))

        assigned = set()
        for i, (centroid, area) in enumerate(curr_objects):
            matched = False
            for obj_id, track in object_tracks.items():
                if len(track) == 0: continue
                prev_frame_idx, prev_pos, prev_area = track[-1]
                if frame_idx - prev_frame_idx > 2:  # Too long ago
                    continue
                dist = np.linalg.norm(np.array(prev_pos) - np.array(centroid))
                area_diff = abs(prev_area - area) / prev_area
                if dist < max_dist and area_diff < size_tol:
                    track.append((frame_idx, centroid, area))
                    matched = True
                    assigned.add(i)
                    break

            if not matched and i not in assigned:
                object_tracks[next_id].append((frame_idx, centroid, area))
                next_id += 1

        prev_objects = curr_objects

    # Filter objects seen in enough frames
    filtered_tracks = {k: v for k, v in object_tracks.items() if len(v) >= min_frames}
    return filtered_tracks


In [11]:
def compute_velocities(tracks):
    velocities = {}
    predictions = {}

    for obj_id, entries in tracks.items():
        positions = np.array([pos for _, pos, _ in entries])
        frames = np.array([f for f, _, _ in entries])

        if len(positions) < 2:
            velocities[obj_id] = (0, 0)
            continue

        dx = np.diff(positions[:, 0])
        dy = np.diff(positions[:, 1])
        dt = np.diff(frames)

        vx = np.mean(dx / dt)
        vy = np.mean(dy / dt)
        velocities[obj_id] = (vx, vy)

        # Predict next position
        last_frame, last_pos, _ = entries[-1]
        pred_x = last_pos[0] + vx
        pred_y = last_pos[1] + vy
        predictions[obj_id] = (pred_x, pred_y)

    return velocities, predictions


In [None]:
tracks = track_objects_across_frames(Diff)
velocities, predictions = compute_velocities(tracks)

for obj_id, vel in velocities.items():
    print(f"Asteroid {obj_id}: Average velocity = {vel}")
