In [1]:
import cv2
import os

# --- Input video path ---
video_path = "fingering__expt_video (1).mp4"

# --- Output directory for frames ---
output_dir = "frames_5fps"
os.makedirs(output_dir, exist_ok=True)

# --- Open the video ---
cap = cv2.VideoCapture(video_path)

# Get original FPS of video
fps = cap.get(cv2.CAP_PROP_FPS)
print(f"Original FPS: {fps}")

# Define extraction rate (5 fps)
target_fps = 5
frame_interval = int(fps /target_fps)  # pick every Nth frame

frame_count = 0
saved_count = 0

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # Save every Nth frame
    if frame_count % frame_interval == 0:
        frame_name = os.path.join(output_dir, f"frame_{saved_count:04d}.jpg")
        cv2.imwrite(frame_name, frame)
        saved_count += 1

    frame_count += 1

cap.release()
print(f"✅ Done! Extracted {saved_count} frames at {target_fps} fps.")


Original FPS: 23.803304687336972
✅ Done! Extracted 207 frames at 5 fps.


In [2]:
import cv2
import os
import numpy as np

# --- Configuration ---
# Path to the folder containing the original frames
input_folder = 'frames_5fps'

# Path to the folder where processed frames will be saved
output_folder = 'output'

# --- ADJUSTED OUTPUT SIZE ---
# Increased the final dimensions to 512x512 for a larger, more detailed image.
IMG_WIDTH = 512
IMG_HEIGHT = 512

# --- Preprocessing Script ---

# 1. Create the output directory if it doesn't already exist
if not os.path.exists(output_folder):
    os.makedirs(output_folder)
    print(f"Created directory: {output_folder}")

# 2. Get a sorted list of all frame filenames from the input folder
try:
    files = sorted(os.listdir(input_folder))
except FileNotFoundError:
    print(f"Error: The input folder '{input_folder}' was not found.")
    print("Please make sure this script is in the same directory as your frames folder.")
    exit()

print(f"Starting LARGER preprocessing ({IMG_WIDTH}x{IMG_HEIGHT}) for {len(files)} frames...")

# 3. Loop through each file in the directory
for filename in files:
    # Ensure we're only processing image files
    if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
        # Construct the full path to the input image
        img_path = os.path.join(input_folder, filename)
        
        # Read the image from the file in color
        img = cv2.imread(img_path)
        if img is None:
            print(f"Warning: Could not read image {filename}. Skipping.")
            continue

        # --- Preprocessing Steps ---

        # a) Crop the image to capture the full circular dish
        height, width, _ = img.shape
        center_x, center_y = width // 2, height // 2
        crop_size = min(height, width, 950) 
        
        start_x = max(center_x - crop_size // 2, 0)
        end_x = min(center_x + crop_size // 2, width)
        start_y = max(center_y - crop_size // 2, 0)
        end_y = min(center_y + crop_size // 2, height)
        
        cropped_img = img[start_y:end_y, start_x:end_x]

        # b) Apply Denoising on the color image
        denoised_img = cv2.medianBlur(cropped_img, 5)

        # c) Resize to the new, larger standard dimensions
        resized_img = cv2.resize(denoised_img, (IMG_WIDTH, IMG_HEIGHT))

        # d) Save the final processed color image to the output folder
        output_path = os.path.join(output_folder, filename)
        cv2.imwrite(output_path, resized_img)

print("\nLarge-format preprocessing complete!")
print(f"All processed color frames have been saved as {IMG_WIDTH}x{IMG_HEIGHT} images in the '{output_folder}' folder.")



Starting LARGER preprocessing (512x512) for 207 frames...

Large-format preprocessing complete!
All processed color frames have been saved as 512x512 images in the 'output' folder.


In [3]:
%pip install tensorflow

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 23.1.2 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [6]:
import cv2
import numpy as np
import os

# Input and output directories
input_folder = "output"
mask_folder = "masks"
seg_folder = "segmented"

# Create folders if they don’t exist
os.makedirs(mask_folder, exist_ok=True)
os.makedirs(seg_folder, exist_ok=True)

for file_name in sorted(os.listdir(input_folder)):
    if file_name.endswith(".jpg") or file_name.endswith(".png"):
        file_path = os.path.join(input_folder, file_name)

        # Read image
        img = cv2.imread(file_path)

        # Convert to HSV
        hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)

        # Define HSV range for reddish/pink fingers
        lower = np.array([130, 50, 50])   # adjust if needed
        upper = np.array([179, 255, 255]) # upper red/pink

        # Create mask
        mask = cv2.inRange(hsv, lower, upper)

        # Clean mask
        kernel = np.ones((3,3), np.uint8)
        mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel, iterations=2)
        mask = cv2.morphologyEx(mask, cv2.MORPH_DILATE, kernel, iterations=1)

        # Apply mask
        segmented = cv2.bitwise_and(img, img, mask=mask)

        # Save mask (binary image)
        mask_path = os.path.join(mask_folder, f"mask_{file_name}")
        cv2.imwrite(mask_path, mask)

        # Save segmented image
        seg_path = os.path.join(seg_folder, f"seg_{file_name}")
        cv2.imwrite(seg_path, segmented)

print("✅ All masks saved in 'masks/' and segmented frames in 'segmented/'")


✅ All masks saved in 'masks/' and segmented frames in 'segmented/'


In [1]:
# ==============================
# PredRNN-lite Training Script
# ==============================

import os, glob, numpy as np
from PIL import Image
import torch, torch.nn as nn, torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision.utils import save_image

# ------------------------------
# CONFIG
# ------------------------------
DATA_ROOT = "segmented" # <-- change to "masks" if needed
IMG_SIZE = 128
SEQ_IN = 8
PRED_STEPS = 1
TRAIN_FRAC = 0.8
EPOCHS = 20
BATCH_SIZE = 8
LR = 1e-3
SAVE_DIR = "predrnn_outputs"

os.makedirs(SAVE_DIR, exist_ok=True)

# ------------------------------
# UTILITIES
# ------------------------------
def list_images(folder):
    exts = ("*.png","*.jpg","*.jpeg","*.bmp","*.tif","*.tiff")
    files = []
    for e in exts:
        files += glob.glob(os.path.join(folder, e))
    return sorted(files)

def to_tensor(img):
    arr = np.asarray(img).astype(np.float32) / 255.0
    if arr.ndim == 2:
        arr = arr[..., None]
    arr = np.transpose(arr, (2,0,1))  # HWC -> CHW
    return torch.from_numpy(arr)

def psnr(pred, target, eps=1e-8):
    mse = torch.mean((pred - target) ** 2)
    if mse.item() == 0:
        return torch.tensor(99.0)
    return 20.0 * torch.log10(1.0 / torch.sqrt(mse + eps))

def ssim(pred, target, C1=0.01**2, C2=0.03**2):
    mu_x = pred.mean(dim=(-2,-1), keepdim=True)
    mu_y = target.mean(dim=(-2,-1), keepdim=True)
    sigma_x = ((pred - mu_x)**2).mean(dim=(-2,-1), keepdim=True)
    sigma_y = ((target - mu_y)**2).mean(dim=(-2,-1), keepdim=True)
    sigma_xy = ((pred - mu_x)*(target - mu_y)).mean(dim=(-2,-1), keepdim=True)
    num = (2*mu_x*mu_y + C1) * (2*sigma_xy + C2)
    den = (mu_x**2 + mu_y**2 + C1) * (sigma_x + sigma_y + C2)
    return (num / (den + 1e-8)).mean()

def binarize(x, thresh=0.5):
    return (x >= thresh).float()

def iou_and_dice(pred, target):
    inter = (pred * target).sum()
    union = ((pred + target) > 0).float().sum()
    iou = (inter / (union + 1e-8)).item()
    dice = (2*inter / (pred.sum()+target.sum()+1e-8)).item()
    return iou, dice

# ------------------------------
# DATASET
# ------------------------------
class FrameSeqDataset(Dataset):
    def __init__(self, folder, img_size=128, seq_in=8, pred_steps=1, split="train", train_frac=0.8):
        files = list_images(folder)
        if len(files) < (seq_in + pred_steps + 1):
            raise RuntimeError("Not enough frames found.")

        # split by time
        n = len(files)
        split_idx = int(n * train_frac)
        if split == "train":
            files = files[:split_idx]
        else:
            files = files[split_idx:]

        self.seq_in, self.pred_steps, self.size = seq_in, pred_steps, img_size
        self.files = files
        self.is_gray = Image.open(files[0]).convert("RGB").mode != "RGB"  # auto-detect

        # windows
        self.windows = []
        for i in range(0, len(files) - (seq_in + pred_steps) + 1):
            idx = list(range(i, i + seq_in + pred_steps))
            self.windows.append(idx)

    def __len__(self): return len(self.windows)

    def __getitem__(self, i):
        idxs = self.windows[i]
        imgs = []
        for j in idxs:
            img = Image.open(self.files[j])
            if img.mode != "RGB":
                img = img.convert("L")
            else:
                img = img.convert("RGB")
            img = img.resize((self.size, self.size), Image.BILINEAR)
            imgs.append(to_tensor(img))
        imgs = torch.stack(imgs, dim=0)  # T,C,H,W
        return imgs[:self.seq_in], imgs[self.seq_in:self.seq_in+self.pred_steps]

# ------------------------------
# MODEL
# ------------------------------
class ConvLSTMCell(nn.Module):
    def __init__(self, in_channels, hidden_channels, kernel_size=3):
        super().__init__()
        padding = kernel_size // 2
        self.conv = nn.Conv2d(in_channels + hidden_channels, 4*hidden_channels, kernel_size, padding=padding)
        self.hidden_channels = hidden_channels

    def forward(self, x, h, c):
        combined = torch.cat([x, h], dim=1)
        gates = self.conv(combined)
        i, f, o, g = torch.chunk(gates, 4, dim=1)
        i = torch.sigmoid(i); f = torch.sigmoid(f); o = torch.sigmoid(o); g = torch.tanh(g)
        c_next = f*c + i*g
        h_next = o * torch.tanh(c_next)
        return h_next, c_next

class ConvLSTM(nn.Module):
    def __init__(self, in_ch, hidden_chs=[64,64], kernel_size=3):
        super().__init__()
        self.layers = nn.ModuleList()
        chs = [in_ch] + hidden_chs
        for i in range(len(hidden_chs)):
            self.layers.append(ConvLSTMCell(chs[i], hidden_chs[i], kernel_size))

    def forward(self, x_seq):
        B, C, H, W = x_seq.shape[1:]
        hs = [torch.zeros(B, cell.hidden_channels, H, W, device=x_seq.device) for cell in self.layers]
        cs = [torch.zeros_like(h) for h in hs]
        outputs = []
        for t in range(x_seq.shape[0]):
            inp = x_seq[t]
            for li, cell in enumerate(self.layers):
                h, c = hs[li], cs[li]
                h, c = cell(inp, h, c)
                hs[li], cs[li] = h, c
                inp = h
            outputs.append(inp)
        return outputs

class PredRNNLite(nn.Module):
    def __init__(self, in_ch):
        super().__init__()
        self.enc = ConvLSTM(in_ch, hidden_chs=[64,64])
        self.dec = ConvLSTM(in_ch, hidden_chs=[64,64])
        self.readout = nn.Conv2d(64, in_ch, 1)

    def forward(self, x_seq, pred_steps=1):
        enc_outs = self.enc(x_seq)
        prev = x_seq[-1]
        outs = []
        for _ in range(pred_steps):
            dec_in = torch.stack([prev], dim=0)
            dec_outs = self.dec(dec_in)
            y = torch.sigmoid(self.readout(dec_outs[-1]))
            outs.append(y)
            prev = y
        return torch.stack(outs, dim=0)

# ------------------------------
# TRAINING
# ------------------------------
device = "cuda" if torch.cuda.is_available() else "cpu"

# auto-detect channels
test_img = Image.open(list_images(DATA_ROOT)[0])
C_in = 1 if test_img.mode != "RGB" else 3

dtrain = FrameSeqDataset(DATA_ROOT, IMG_SIZE, SEQ_IN, PRED_STEPS, split="train", train_frac=TRAIN_FRAC)
dtest  = FrameSeqDataset(DATA_ROOT, IMG_SIZE, SEQ_IN, PRED_STEPS, split="test",  train_frac=TRAIN_FRAC)
train_loader = DataLoader(dtrain, batch_size=BATCH_SIZE, shuffle=True, drop_last=True)
test_loader  = DataLoader(dtest,  batch_size=1, shuffle=False)

model = PredRNNLite(in_ch=C_in).to(device)
opt = torch.optim.Adam(model.parameters(), lr=LR)

def train_epoch():
    model.train()
    total = 0.0
    for x,y in train_loader:
        x,y = x.to(device), y.to(device)
        opt.zero_grad()
        yhat = model(x, pred_steps=PRED_STEPS)
        l1 = F.l1_loss(yhat, y)
        ssim_loss = 1.0 - torch.mean(torch.stack([ssim(yhat[0][i], y[0][i]) for i in range(y.shape[1])]))
        loss = 0.7*l1 + 0.3*ssim_loss
        loss.backward()
        opt.step()
        total += loss.item()
    return total/len(train_loader)

best_ssim = -1
for epoch in range(1, EPOCHS+1):
    tr_loss = train_epoch()
    model.eval()
    with torch.no_grad():
        ssim_list, psnr_list, iou_list, dice_list = [], [], [], []
        for x,y in test_loader:
            x,y = x.to(device), y.to(device)
            yhat = model(x, pred_steps=PRED_STEPS)
            yh, yt = yhat[0,0], y[0,0]
            s = ssim(yh, yt).item(); p = psnr(yh, yt).item()
            ssim_list.append(s); psnr_list.append(p)
            # IoU/Dice only if grayscale
            yh_g = torch.mean(yh, dim=0, keepdim=True) if C_in==3 else yh
            yt_g = torch.mean(yt, dim=0, keepdim=True) if C_in==3 else yt
            yh_b, yt_b = binarize(yh_g, 0.5), binarize(yt_g, 0.5)
            iou,dice = iou_and_dice(yh_b, yt_b)
            iou_list.append(iou); dice_list.append(dice)
        m_ssim, m_psnr = np.mean(ssim_list), np.mean(psnr_list)
        m_iou, m_dice = np.mean(iou_list), np.mean(dice_list)
    if m_ssim > best_ssim:
        best_ssim = m_ssim
        torch.save(model.state_dict(), os.path.join(SAVE_DIR, "best_predrnn.pt"))
    print(f"Epoch {epoch:02d} | loss {tr_loss:.4f} | SSIM {m_ssim:.4f} | PSNR {m_psnr:.2f} | IoU {m_iou:.3f} | Dice {m_dice:.3f}")

# ------------------------------
# SAVE SAMPLES
# ------------------------------
os.makedirs(os.path.join(SAVE_DIR,"samples"), exist_ok=True)
model.load_state_dict(torch.load(os.path.join(SAVE_DIR,"best_predrnn.pt"), map_location=device))
model.eval()
with torch.no_grad():
    for k,(x,y) in enumerate(test_loader):
        x,y = x.to(device), y.to(device)
        yhat = model(x, pred_steps=PRED_STEPS)


  l1 = F.l1_loss(yhat, y)


Epoch 01 | loss 0.6032 | SSIM 0.1417 | PSNR 7.50 | IoU 0.000 | Dice 0.000


KeyboardInterrupt: 