In [3]:
import cv2
import numpy as np
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.metrics import classification_report, accuracy_score
import pandas as pd
from IPython.display import display
import os

# =====================CONFIG======================
STEPS_PER_REP = 4
NUM_REPS = 3
FRAME_SKIP = 10
LSTM_HIDDEN = 128
NUM_CLASSES = 4  # step1-step4
THRESHOLD = 0.85
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# -----------------Transform---------------------
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])

# -----------------Load CNN----------------------
cnn = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
cnn = nn.Sequential(*list(cnn.children())[:-1])  # remove final FC
cnn.eval().to(device)

# ----------------- Feature extraction -----------
def extract_segment_features(video_path):
    cap = cv2.VideoCapture(video_path)
    frames = []
    count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret: break
        if count % FRAME_SKIP == 0:
            img = transform(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)).unsqueeze(0).to(device)
            with torch.no_grad():
                feat = cnn(img).squeeze().cpu().numpy()
            frames.append(feat)
        count += 1
    cap.release()

    frames = np.array(frames)
    total_segments = NUM_REPS * STEPS_PER_REP
    segment_len = max(len(frames) // total_segments, 1)
    segments = []
    for i in range(total_segments):
        start = i * segment_len
        end = start + segment_len
        seg = frames[start:end]
        if len(seg) == 0:
            seg = frames[-1:].copy()  # repeat last frame if empty
        segments.append(seg)
    return segments

# -----------------LSTM Classifier-----------------
class LSTMClassifier(nn.Module):
    def __init__(self, input_size=512, hidden_size=LSTM_HIDDEN, num_classes=NUM_CLASSES):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        _, (h_n, _) = self.lstm(x)
        out = self.fc(h_n.squeeze(0))
        return out

# ----------------- Load trained LSTM -----------------
lstm_model = LSTMClassifier(input_size=512, hidden_size=LSTM_HIDDEN, num_classes=NUM_CLASSES).to(device)
lstm_path = r"C:\Users\sinch\ai\lstm_model.pth"

if os.path.exists(lstm_path):
    checkpoint = torch.load(lstm_path, map_location=device)
    # Load LSTM weights only
    filtered_dict = {k: v for k, v in checkpoint.items() if k.startswith("lstm.")}
    lstm_model.load_state_dict(filtered_dict, strict=False)
    lstm_model.eval()
    print("✅ Trained LSTM weights loaded successfully")
else:
    print("⚠️ LSTM weights not found — using random initialization")


✅ Trained LSTM weights loaded successfully


In [7]:
# ----------------- Encode video with LSTM -------
def encode_segments_with_lstm(segments):
    segment_features = []
    predicted_labels = []

    for seg in segments:
        seg = np.array(seg)
        seg_avg = seg.mean(axis=0)
        inp = torch.tensor(seg_avg, dtype=torch.float32).unsqueeze(0).unsqueeze(0).to(device)
        with torch.no_grad():
            _, (h_n, _) = lstm_model.lstm(inp)
            feat = h_n.squeeze(0).cpu().numpy().reshape(-1)
            out = lstm_model.fc(torch.tensor(h_n.squeeze(0)).to(device))
            pred = torch.argmax(out, dim=1).item()
        segment_features.append(feat)
        predicted_labels.append(pred + 1)
    return np.array(segment_features), predicted_labels

# ----------------- Sequence & Step analysis -----------------
def check_sequence_order(predicted_labels):
    correct_order = []
    total_reps = len(predicted_labels) // STEPS_PER_REP
    for rep in range(total_reps):
        start = rep * STEPS_PER_REP
        seq = predicted_labels[start:start + STEPS_PER_REP]
        correct_order.append(seq == list(range(1, STEPS_PER_REP + 1)))
    return correct_order

def step_similarity_analysis(features_array):
    similarities = []
    for step in range(STEPS_PER_REP):
        idxs = [step + rep * STEPS_PER_REP for rep in range(NUM_REPS)]
        feats = features_array[idxs].reshape(len(idxs), -1)
        sims = cosine_similarity(feats)
        similarities.append(sims)
    return np.array(similarities)

def classify_steps(features_array, threshold=THRESHOLD):
    step_status = {}
    ground_truth = {}
    predictions = {}
    for step in range(STEPS_PER_REP):
        idxs = [step + rep * STEPS_PER_REP for rep in range(NUM_REPS)]
        feats = features_array[idxs].reshape(len(idxs), -1)
        sims = cosine_similarity(feats)
        avg_sim = np.mean([sims[i, j] for i in range(NUM_REPS) for j in range(i + 1, NUM_REPS)])
        step_status[step + 1] = 'Correct' if avg_sim >= threshold else 'Incorrect'
        ground_truth[step + 1] = 'Correct'
        predictions[step + 1] = step_status[step + 1]
    return step_status, ground_truth, predictions

# ----------------- Build reference from multiple videos -----------------
def build_reference_from_multiple(videos, save_path="reference_steps.npy"):
    all_refs = []
    for video_path in videos:
        segments = extract_segment_features(video_path)
        ref_feats, _ = encode_segments_with_lstm(segments)
        all_refs.append(ref_feats)
    all_refs = np.stack(all_refs)
    refs = []
    for step in range(STEPS_PER_REP):
        step_vecs = []
        for vid in range(all_refs.shape[0]):
            idxs = [step + rep * STEPS_PER_REP for rep in range(NUM_REPS)]
            step_vecs.extend(all_refs[vid, idxs])
        refs.append(np.mean(step_vecs, axis=0))
    reference_steps = np.array(refs)
    np.save(save_path, reference_steps)
    print(f"✅ Built reference from {len(videos)} videos -> {save_path}")
    return reference_steps

def build_reference_sequences(videos, save_path="reference_sequences.npy"):
    all_refs = []
    for video_path in videos:
        segments = extract_segment_features(video_path)
        seq = np.concatenate(segments, axis=0)
        inp = torch.tensor(seq, dtype=torch.float32).unsqueeze(0).to(device)
        with torch.no_grad():
            seq_feat, _ = lstm_model.lstm(inp)
            seq_feat = seq_feat.squeeze(0).cpu().numpy().mean(axis=0)  # average over sequence
        all_refs.append(seq_feat)
    all_refs = np.array(all_refs)
    np.save(save_path, all_refs)
    print(f"✅ Built sequence reference from {len(videos)} videos -> {save_path}")
    return all_refs

# ----------------- Example videos -----------------
videos = [
    r"C:\Users\sinch\ai\goldstandard1.mp4",
    r"C:\Users\sinch\ai\goldstandard2.mp4",
    r"C:\Users\sinch\ai\goldstandard3.mp4",
    r"C:\Users\sinch\ai\goldstandard4.mp4",
    r"C:\Users\sinch\ai\testvideo.mp4"
]

reference_steps = build_reference_from_multiple(videos)
reference_sequences = build_reference_sequences(videos)


  out = lstm_model.fc(torch.tensor(h_n.squeeze(0)).to(device))


✅ Built reference from 5 videos -> reference_steps.npy
✅ Built sequence reference from 5 videos -> reference_sequences.npy


In [15]:
import torch
# Save weights/model 
torch.save(lstm_model.state_dict(), r"C:\Users\sinch\ai\lstm_model.pth")
torch.save(cnn.state_dict(), r"C:\Users\sinch\ai\cnn_model.pth")

In [7]:
# seq
import cv2
import numpy as np
import torch
import torchvision.models as models
import torchvision.transforms as transforms
from sklearn.metrics.pairwise import cosine_similarity

# ================= CONFIG ==============
FRAME_SKIP = 5          # sample every 5 frames
SEQ_LEN = 12            # number of frames per sequence
THRESHOLD = 0.75       # cosine similarity threshold
NUM_STEPS = 4           # total steps in the sequence

# ---------------- Device ----------------
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# ---------------- CNN Model ----------------
cnn_model = models.resnet18(pretrained=True)
cnn_model = torch.nn.Sequential(*list(cnn_model.children())[:-1])  # remove FC
cnn_model.eval().to(device)

# ---------------- LSTM Feature Extractor ----------------
class LSTMFeatureExtractor(torch.nn.Module):
    def __init__(self, input_size=512, hidden_size=128, num_layers=1):
        super().__init__()
        self.lstm = torch.nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)

    def forward(self, x):
        _, (h_n, _) = self.lstm(x)
        return h_n.squeeze(0)

# Load LSTM weights
lstm_model = LSTMFeatureExtractor(input_size=512, hidden_size=128).to(device)
checkpoint = torch.load(r"C:\Users\sinch\ai\lstm_model.pth", map_location=device)
filtered_dict = {k: v for k, v in checkpoint.items() if not k.startswith("fc.")}
lstm_model.load_state_dict(filtered_dict, strict=False)
lstm_model.eval()
print("✅ LSTM model loaded successfully")

# ---------------- Reference Features ----------------
reference_features = np.load("reference_sequences.npy")  # shape: [num_steps, hidden_size]

# ---------------- Transform ----------------
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224,224)),
    transforms.ToTensor(),
])

# ---------------- Live Loop ----------------
url = r"rtsp://admin:admin%40123@192.168.0.171/cam/realmonitor?channel=1&subtype=0"
cap = cv2.VideoCapture(url, cv2.CAP_FFMPEG)
frame_buffer = []
frame_count = 0
current_step = 0  # index of current step (0..NUM_STEPS-1)
detected_steps = []

while True:
    ret, frame = cap.read()
    if not ret:
        break

    frame_count += 1
    if frame_count % FRAME_SKIP != 0:
        cv2.imshow("Live", frame)
        if cv2.waitKey(1) & 0xFF == 27:
            break
        continue

    # Preprocess frame
    img = transform(frame).unsqueeze(0).to(device)

    # CNN feature extraction
    with torch.no_grad():
        cnn_feat = cnn_model(img).squeeze().cpu().numpy()
    frame_buffer.append(cnn_feat)

    # Run LSTM when buffer is full
    if len(frame_buffer) >= SEQ_LEN:
        seq_input = torch.tensor(np.array(frame_buffer[-SEQ_LEN:]), dtype=torch.float32).unsqueeze(0).to(device)
        with torch.no_grad():
            live_feat = lstm_model(seq_input).cpu().numpy().reshape(-1)

        # Cosine similarity with reference steps
        sims = cosine_similarity(live_feat.reshape(1,-1), reference_features)
        best_step = sims.argmax()
        max_sim = sims.max()

        # Determine status
        if max_sim >= THRESHOLD:
            if best_step == current_step:
                status = f"Step {best_step+1} ✅"
                if best_step not in detected_steps:
                    detected_steps.append(best_step)
                current_step += 1 if current_step < NUM_STEPS-1 else 0
            else:
                status = f"❌ Incorrect sequence! Missed Step {current_step+1}"
        else:
            status = f"❌ Step unclear / sequence skipped"

        color = (0,255,0) if "✅" in status else (0,0,255)
        cv2.putText(frame, status, (50,50), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)

        print(f"[DEBUG] Detected step: {best_step+1}, Max similarity: {max_sim:.3f}, Status: {status}")


    cv2.imshow("Live", frame)
    if cv2.waitKey(1) & 0xFF == 27:
        break

cap.release()
cv2.destroyAllWindows()




✅ LSTM model loaded successfully
[DEBUG] Detected step: 5, Max similarity: 0.774, Status: ❌ Incorrect sequence! Missed Step 1
[DEBUG] Detected step: 5, Max similarity: 0.775, Status: ❌ Incorrect sequence! Missed Step 1
[DEBUG] Detected step: 5, Max similarity: 0.776, Status: ❌ Incorrect sequence! Missed Step 1
[DEBUG] Detected step: 5, Max similarity: 0.778, Status: ❌ Incorrect sequence! Missed Step 1
[DEBUG] Detected step: 5, Max similarity: 0.779, Status: ❌ Incorrect sequence! Missed Step 1
[DEBUG] Detected step: 5, Max similarity: 0.781, Status: ❌ Incorrect sequence! Missed Step 1
[DEBUG] Detected step: 5, Max similarity: 0.783, Status: ❌ Incorrect sequence! Missed Step 1
[DEBUG] Detected step: 5, Max similarity: 0.785, Status: ❌ Incorrect sequence! Missed Step 1
[DEBUG] Detected step: 5, Max similarity: 0.787, Status: ❌ Incorrect sequence! Missed Step 1
[DEBUG] Detected step: 5, Max similarity: 0.789, Status: ❌ Incorrect sequence! Missed Step 1
[DEBUG] Detected step: 5, Max similar

In [1]:
# ---------------- Live Loop ----------------
url = r"rtsp://admin:admin%40123@192.168.0.171/cam/realmonitor?channel=1&subtype=0"
cap = cv2.VideoCapture(url, cv2.CAP_FFMPEG)
frame_buffer = []
frame_count = 0
expected_step = 0  # start from step 0
last_status_type = None  # "inorder", "outoforder", "unclear"

while True:
    ret, frame = cap.read()
    if not ret:
        break

    frame_count += 1
    if frame_count % FRAME_SKIP != 0:
        cv2.imshow("Live", frame)
        if cv2.waitKey(1) & 0xFF == 27:
            break
        continue

    # Preprocess frame
    img = transform(frame).unsqueeze(0).to(device)

    # CNN feature extraction
    with torch.no_grad():
        cnn_feat = cnn_model(img).squeeze().cpu().numpy()
    frame_buffer.append(cnn_feat)

    # Run LSTM when buffer is full
    if len(frame_buffer) >= SEQ_LEN:
        seq_input = torch.tensor(np.array(frame_buffer[-SEQ_LEN:]), dtype=torch.float32).unsqueeze(0).to(device)
        with torch.no_grad():
            live_feat = lstm_model(seq_input).cpu().numpy().reshape(-1)

        # Cosine similarity with reference steps
        sims = cosine_similarity(live_feat.reshape(1,-1), reference_features)
        best_step = sims.argmax()
        max_sim = sims.max()

        if max_sim >= THRESHOLD:
            if best_step == expected_step:
                status = f"✅ Step {best_step+1} detected in order"
                expected_step = (expected_step + 1) % NUM_STEPS
                last_status_type = "inorder"
            else:
                if last_status_type != "outoforder":
                    status = f"⚠️ Out of order: Detected Step {best_step+1}, Expected Step {expected_step+1}"
                    beep()
                    last_status_type = "outoforder"
                else:
                    status = f"Detected Step {best_step+1}"  # show only detection
        else:
            status = "❌ Unclear / No valid step"
            last_status_type = "unclear"

        color = (0,255,0) if "✅" in status else (0,0,255)
        cv2.putText(frame, status, (50,50), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)

        print(f"[DEBUG] Detected: {best_step+1}, Similarity: {max_sim:.3f}, Status: {status}")

    cv2.imshow("Live", frame)
    if cv2.waitKey(1) & 0xFF == 27:  # ESC to exit
        break

cap.release()
cv2.destroyAllWindows()


NameError: name 'cv2' is not defined

In [5]:
# ---------------- CONFIG ----------------
FRAME_SKIP = 2         # sample every 2 frames (more frequent)
SEQ_LEN = 12           # number of frames per sequence
THRESHOLD = 0.75       # similarity threshold
MIN_CONSECUTIVE = 2    # frames needed to confirm a step

# ---------------- Live Loop ----------------
url = r"rtsp://admin:admin%40123@192.168.0.171/cam/realmonitor?channel=1&subtype=0"
cap = cv2.VideoCapture(url, cv2.CAP_FFMPEG)
frame_buffer = []
frame_count = 0
last_reported_step = None
current_detected = None
consecutive_count = 0

while True:
    ret, frame = cap.read()
    if not ret:
        break

    frame_count += 1
    if frame_count % FRAME_SKIP != 0:
        cv2.imshow("Live", frame)
        if cv2.waitKey(1) & 0xFF == 27:
            break
        continue

    # Preprocess frame
    img = transform(frame).unsqueeze(0).to(device)

    # CNN feature extraction
    with torch.no_grad():
        cnn_feat = cnn_model(img).squeeze().cpu().numpy()
    frame_buffer.append(cnn_feat)

    # Keep only last SEQ_LEN frames
    if len(frame_buffer) > SEQ_LEN:
        frame_buffer.pop(0)

    if len(frame_buffer) == SEQ_LEN:
        seq_input = torch.tensor(np.array(frame_buffer), dtype=torch.float32).unsqueeze(0).to(device)
        with torch.no_grad():
            live_feat = lstm_model(seq_input).cpu().numpy().reshape(-1)

        sims = cosine_similarity(live_feat.reshape(1,-1), reference_features)
        best_step = sims.argmax()
        max_sim = sims.max()

        # Consecutive detection logic
        if max_sim >= THRESHOLD:
            if current_detected == best_step:
                consecutive_count += 1
            else:
                current_detected = best_step
                consecutive_count = 1

            if consecutive_count >= MIN_CONSECUTIVE and current_detected != last_reported_step:
                status = f"✅ Step {current_detected+1} detected"
                last_reported_step = current_detected
                consecutive_count = 0
                color = (0,255,0)
                cv2.putText(frame, status, (50,50), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
                print(f"[DEBUG] Detected: {current_detected+1}, Similarity: {max_sim:.3f}, Status: {status}")
        else:
            current_detected = None
            consecutive_count = 0

    cv2.imshow("Live", frame)
    if cv2.waitKey(1) & 0xFF == 27:
        break

cap.release()
cv2.destroyAllWindows()


In [3]:
import cv2

url = r"rtsp://admin:admin%40123@192.168.0.171/cam/realmonitor?channel=1&subtype=0"
cap = cv2.VideoCapture(url, cv2.CAP_FFMPEG)

if not cap.isOpened():
    print("❌ Cannot open RTSP")
else:
    print("✅ Stream opened")
    while True:
        ret, frame = cap.read()
        if not ret:
            print("❌ Failed to grab frame")
            break
        cv2.imshow("RTSP Stream", frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()

✅ Stream opened


In [None]:
# ---------------- Live Loop ----------------
import cv2
import numpy as np
import torch
import torchvision.models as models
import torchvision.transforms as transforms
from sklearn.metrics.pairwise import cosine_similarity

# ================= CONFIG ==============
FRAME_SKIP = 5          # sample every 5 frames
SEQ_LEN = 12            # number of frames per sequence
THRESHOLD = 0.80       # cosine similarity threshold
NUM_STEPS = 4           # total steps in the sequence

# ---------------- Device ----------------
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# ---------------- CNN Model ----------------
cnn_model = models.resnet18(pretrained=True)
cnn_model = torch.nn.Sequential(*list(cnn_model.children())[:-1])  # remove FC
cnn_model.eval().to(device)

# ---------------- LSTM Feature Extractor ----------------
class LSTMFeatureExtractor(torch.nn.Module):
    def __init__(self, input_size=512, hidden_size=128, num_layers=1):
        super().__init__()
        self.lstm = torch.nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)

    def forward(self, x):
        _, (h_n, _) = self.lstm(x)
        return h_n.squeeze(0)

# Load LSTM weights
lstm_model = LSTMFeatureExtractor(input_size=512, hidden_size=128).to(device)
checkpoint = torch.load(r"C:\Users\sinch\ai\lstm_model.pth", map_location=device)
filtered_dict = {k: v for k, v in checkpoint.items() if not k.startswith("fc.")}
lstm_model.load_state_dict(filtered_dict, strict=False)
lstm_model.eval()
print("✅ LSTM model loaded successfully")

# ---------------- Reference Features ----------------
reference_features = np.load("reference_sequences.npy")  # shape: [num_steps, hidden_size]

# ---------------- Transform ----------------
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224,224)),
    transforms.ToTensor(),
])

url = r"rtsp://admin:admin%40123@192.168.0.171/cam/realmonitor?channel=1&subtype=0"
cap = cv2.VideoCapture(url, cv2.CAP_FFMPEG)
frame_buffer = []
frame_count = 0
expected_step = 0      # which step should come next
last_reported_step = None  # avoid repeating messages

while True:
    ret, frame = cap.read()
    if not ret:
        break

    frame_count += 1
    if frame_count % FRAME_SKIP != 0:
        cv2.imshow("Live", frame)
        if cv2.waitKey(1) & 0xFF == 27:
            break
        continue

    # Preprocess frame
    img = transform(frame).unsqueeze(0).to(device)

    # CNN feature extraction
    with torch.no_grad():
        cnn_feat = cnn_model(img).squeeze().cpu().numpy()
    frame_buffer.append(cnn_feat)

    # Keep only last SEQ_LEN frames
    if len(frame_buffer) > SEQ_LEN:
        frame_buffer.pop(0)

    if len(frame_buffer) == SEQ_LEN:
        seq_input = torch.tensor(np.array(frame_buffer), dtype=torch.float32).unsqueeze(0).to(device)
        with torch.no_grad():
            live_feat = lstm_model(seq_input).cpu().numpy().reshape(-1)

        sims = cosine_similarity(live_feat.reshape(1,-1), reference_features)
        best_step = sims.argmax()
        max_sim = sims.max()

        status = None

        if max_sim >= THRESHOLD and best_step != last_reported_step:
            if best_step == expected_step:
                # Step detected in order
                status = f"✅ Step {best_step+1} detected"
            elif best_step > expected_step:
                # Step ahead, assume intermediate steps passed
                status = f"✅ Step {best_step+1} detected (intermediate assumed)"
            else:
                # Step behind, ignore repeated / holding frames
                status = f"✅ Step {best_step+1} detected (holding/repeated)"

            # Update expected step for next detection
            expected_step = (best_step + 1) % NUM_STEPS
            last_reported_step = best_step

        if status:
            color = (0,255,0) if "✅" in status else (0,0,255)
            cv2.putText(frame, status, (50,50), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
            print(f"[DEBUG] {status}, Similarity: {max_sim:.3f}")

    cv2.imshow("Live", frame)
    if cv2.waitKey(1) & 0xFF == 27:
        break

cap.release()
cv2.destroyAllWindows()


In [1]:
# ---------------- Live Loop ----------------
import cv2
import numpy as np
import torch
import torchvision.models as models
import torchvision.transforms as transforms
from sklearn.metrics.pairwise import cosine_similarity

# ================= CONFIG ==============
FRAME_SKIP = 5          # sample every 5 frames
SEQ_LEN = 12            # number of frames per sequence
THRESHOLD = 0.75       # cosine similarity threshold
NUM_STEPS = 4           # total steps in the sequence

# ---------------- Device ----------------
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# ---------------- CNN Model ----------------
cnn_model = models.resnet18(pretrained=True)
cnn_model = torch.nn.Sequential(*list(cnn_model.children())[:-1])  # remove FC
cnn_model.eval().to(device)

# ---------------- LSTM Feature Extractor ----------------
class LSTMFeatureExtractor(torch.nn.Module):
    def __init__(self, input_size=512, hidden_size=128, num_layers=1):
        super().__init__()
        self.lstm = torch.nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)

    def forward(self, x):
        _, (h_n, _) = self.lstm(x)
        return h_n.squeeze(0)

# Load LSTM weights
lstm_model = LSTMFeatureExtractor(input_size=512, hidden_size=128).to(device)
checkpoint = torch.load(r"C:\Users\sinch\ai\lstm_model.pth", map_location=device)
filtered_dict = {k: v for k, v in checkpoint.items() if not k.startswith("fc.")}
lstm_model.load_state_dict(filtered_dict, strict=False)
lstm_model.eval()
print("✅ LSTM model loaded successfully")

# ---------------- Reference Features ----------------
reference_features = np.load("reference_sequences.npy")  # shape: [num_steps, hidden_size]

# ---------------- Transform ----------------
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224,224)),
    transforms.ToTensor(),
])

url = r"rtsp://admin:admin%40123@192.168.0.171/cam/realmonitor?channel=1&subtype=0"

cap = cv2.VideoCapture(url, cv2.CAP_FFMPEG)
frame_buffer = []
frame_count = 0
expected_step = 0      # which step should come next
last_reported_step = None  # avoid repeating messages
skipped_reported = False   # to show skipped warning only once

while True:
    ret, frame = cap.read()
    if not ret:
        break

    frame_count += 1
    if frame_count % FRAME_SKIP != 0:
        cv2.imshow("Live", frame)
        if cv2.waitKey(1) & 0xFF == 27:
            break
        continue

    # Preprocess frame
    img = transform(frame).unsqueeze(0).to(device)

    # CNN feature extraction
    with torch.no_grad():
        cnn_feat = cnn_model(img).squeeze().cpu().numpy()
    frame_buffer.append(cnn_feat)

    if len(frame_buffer) > SEQ_LEN:
        frame_buffer.pop(0)

    if len(frame_buffer) == SEQ_LEN:
        seq_input = torch.tensor(np.array(frame_buffer), dtype=torch.float32).unsqueeze(0).to(device)
        with torch.no_grad():
            live_feat = lstm_model(seq_input).cpu().numpy().reshape(-1)

        sims = cosine_similarity(live_feat.reshape(1,-1), reference_features)
        best_step = sims.argmax()
        max_sim = sims.max()

        status = None

        if max_sim >= THRESHOLD:
            # Step detected in order
            if best_step == expected_step:
                status = f"✅ Step {best_step+1} detected"
                expected_step = (expected_step + 1) % NUM_STEPS
                skipped_reported = False
            # Step ahead
            elif best_step > expected_step:
                if not skipped_reported:
                    status = f"⚠️ Missed Step(s), Jumped to Step {best_step+1}"
                    skipped_reported = True
                else:
                    status = f"✅ Step {best_step+1} detected (holding/intermediate)"
                expected_step = (best_step + 1) % NUM_STEPS
            # Step behind (holding)
            else:
                status = f"✅ Step {best_step+1} detected (holding/repeated)"

            last_reported_step = best_step

        if status:
            color = (0,255,0) if "✅" in status else (0,0,255)
            cv2.putText(frame, status, (50,50), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
            print(f"[DEBUG] {status}, Similarity: {max_sim:.3f}")

    cv2.imshow("Live", frame)
    if cv2.waitKey(1) & 0xFF == 27:
        break

cap.release()
cv2.destroyAllWindows()




✅ LSTM model loaded successfully
[DEBUG] ⚠️ Missed Step(s), Jumped to Step 5, Similarity: 0.756
[DEBUG] ✅ Step 5 detected (holding/intermediate), Similarity: 0.757
[DEBUG] ✅ Step 5 detected (holding/intermediate), Similarity: 0.758
[DEBUG] ✅ Step 5 detected (holding/intermediate), Similarity: 0.761
[DEBUG] ✅ Step 5 detected (holding/intermediate), Similarity: 0.767
[DEBUG] ✅ Step 5 detected (holding/intermediate), Similarity: 0.770
[DEBUG] ✅ Step 5 detected (holding/intermediate), Similarity: 0.770
[DEBUG] ✅ Step 5 detected (holding/intermediate), Similarity: 0.773
[DEBUG] ✅ Step 5 detected (holding/intermediate), Similarity: 0.775
[DEBUG] ✅ Step 5 detected (holding/intermediate), Similarity: 0.780
[DEBUG] ✅ Step 5 detected (holding/intermediate), Similarity: 0.780
[DEBUG] ✅ Step 5 detected (holding/intermediate), Similarity: 0.782
[DEBUG] ✅ Step 5 detected (holding/intermediate), Similarity: 0.786
[DEBUG] ✅ Step 5 detected (holding/intermediate), Similarity: 0.792
[DEBUG] ✅ Step 5 det

In [11]:
# seq
import cv2
import numpy as np
import torch
import torchvision.models as models
import torchvision.transforms as transforms
from sklearn.metrics.pairwise import cosine_similarity
import platform

# ================= CONFIG ==============
FRAME_SKIP = 5          # sample every 5 frames
SEQ_LEN = 12            # number of frames per sequence
THRESHOLD = 0.75        # cosine similarity threshold
NUM_STEPS = 4           # total steps in the sequence

# ---------------- Device ----------------
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# ---------------- CNN Model ----------------
cnn_model = models.resnet18(pretrained=True)
cnn_model = torch.nn.Sequential(*list(cnn_model.children())[:-1])  # remove FC
cnn_model.eval().to(device)

# ---------------- LSTM Feature Extractor ----------------
class LSTMFeatureExtractor(torch.nn.Module):
    def __init__(self, input_size=512, hidden_size=128, num_layers=1):
        super().__init__()
        self.lstm = torch.nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)

    def forward(self, x):
        _, (h_n, _) = self.lstm(x)
        return h_n.squeeze(0)

# Load LSTM weights
lstm_model = LSTMFeatureExtractor(input_size=512, hidden_size=128).to(device)
checkpoint = torch.load(r"C:\Users\sinch\ai\lstm_model.pth", map_location=device)
filtered_dict = {k: v for k, v in checkpoint.items() if not k.startswith("fc.")}
lstm_model.load_state_dict(filtered_dict, strict=False)
lstm_model.eval()
print("✅ LSTM model loaded successfully")

# ---------------- Reference Features ----------------
reference_features = np.load("reference_sequences.npy")  # shape: [num_steps, hidden_size]

# ---------------- Transform ----------------
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224,224)),
    transforms.ToTensor(),
])

# ---------------- Optional Beep ----------------
def beep():
    if platform.system() == "Windows":
        import winsound
        winsound.Beep(1000, 200)  # freq=1000Hz, duration=200ms
    else:
        print("\a")  # fallback console beep

# ---------------- Live Loop ----------------
cap = cv2.VideoCapture(0)
frame_buffer = []
frame_count = 0
current_step = 0  # index of current step (0..NUM_STEPS-1)
detected_steps = []
missed_flag = False  # to avoid spamming missed step messages

while True:
    ret, frame = cap.read()
    if not ret:
        break

    frame_count += 1
    if frame_count % FRAME_SKIP != 0:
        cv2.imshow("Live", frame)
        if cv2.waitKey(1) & 0xFF == 27:
            break
        continue

    # Preprocess frame
    img = transform(frame).unsqueeze(0).to(device)

    # CNN feature extraction
    with torch.no_grad():
        cnn_feat = cnn_model(img).squeeze().cpu().numpy()
    frame_buffer.append(cnn_feat)

    # Run LSTM when buffer is full
    if len(frame_buffer) >= SEQ_LEN:
        seq_input = torch.tensor(np.array(frame_buffer[-SEQ_LEN:]), dtype=torch.float32).unsqueeze(0).to(device)
        with torch.no_grad():
            live_feat = lstm_model(seq_input).cpu().numpy().reshape(-1)

        # Cosine similarity with reference steps
        sims = cosine_similarity(live_feat.reshape(1,-1), reference_features)
        best_step = sims.argmax()
        max_sim = sims.max()

        # Determine status
        if max_sim >= THRESHOLD:
            if best_step == current_step:
                status = f"Step {best_step+1} ✅"
                if best_step not in detected_steps:
                    detected_steps.append(best_step)
                current_step += 1 if current_step < NUM_STEPS-1 else 0
                missed_flag = False  # reset missed state
            else:
                if not missed_flag:  # report only once
                    status = f"❌ Missed Step {current_step+1}"
                    beep()
                    current_step += 1 if current_step < NUM_STEPS-1 else 0
                    missed_flag = True
                else:
                    status = f"Waiting for Step {current_step+1}..."
        else:
            status = f"❌ Step unclear / sequence skipped"

        color = (0,255,0) if "✅" in status else (0,0,255)
        cv2.putText(frame, status, (50,50), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)

        print(f"[DEBUG] Detected step: {best_step+1}, Max similarity: {max_sim:.3f}, Status: {status}")

    cv2.imshow("Live", frame)
    if cv2.waitKey(1) & 0xFF == 27:  # ESC to exit
        break

cap.release()
cv2.destroyAllWindows()




✅ LSTM model loaded successfully
[DEBUG] Detected step: 5, Max similarity: 0.787, Status: ❌ Missed Step 1
[DEBUG] Detected step: 5, Max similarity: 0.790, Status: Waiting for Step 2...
[DEBUG] Detected step: 5, Max similarity: 0.798, Status: Waiting for Step 2...
[DEBUG] Detected step: 5, Max similarity: 0.802, Status: Waiting for Step 2...
[DEBUG] Detected step: 5, Max similarity: 0.806, Status: Waiting for Step 2...
[DEBUG] Detected step: 5, Max similarity: 0.804, Status: Waiting for Step 2...
[DEBUG] Detected step: 5, Max similarity: 0.799, Status: Waiting for Step 2...
[DEBUG] Detected step: 5, Max similarity: 0.788, Status: Waiting for Step 2...
[DEBUG] Detected step: 5, Max similarity: 0.797, Status: Waiting for Step 2...
[DEBUG] Detected step: 5, Max similarity: 0.797, Status: Waiting for Step 2...
[DEBUG] Detected step: 5, Max similarity: 0.799, Status: Waiting for Step 2...
[DEBUG] Detected step: 5, Max similarity: 0.801, Status: Waiting for Step 2...
[DEBUG] Detected step: 5,

In [15]:
# ---------------- Live Loop ----------------
cap = cv2.VideoCapture(0)
frame_buffer = []
frame_count = 0
current_step = 0  # index of current step (0..NUM_STEPS-1)
detected_steps = []
missed_flag = False  # to avoid repeating missed step messages

while True:
    ret, frame = cap.read()
    if not ret:
        break

    frame_count += 1
    if frame_count % FRAME_SKIP != 0:
        cv2.imshow("Live", frame)
        if cv2.waitKey(1) & 0xFF == 27:
            break
        continue

    # Preprocess frame
    img = transform(frame).unsqueeze(0).to(device)

    # CNN feature extraction
    with torch.no_grad():
        cnn_feat = cnn_model(img).squeeze().cpu().numpy()
    frame_buffer.append(cnn_feat)

    # Keep only last SEQ_LEN frames
    if len(frame_buffer) > SEQ_LEN:
        frame_buffer.pop(0)

    if len(frame_buffer) == SEQ_LEN:
        seq_input = torch.tensor(np.array(frame_buffer), dtype=torch.float32).unsqueeze(0).to(device)
        with torch.no_grad():
            live_feat = lstm_model(seq_input).cpu().numpy().reshape(-1)

        # Cosine similarity with reference steps
        sims = cosine_similarity(live_feat.reshape(1,-1), reference_features)
        best_step = sims.argmax()
        max_sim = sims.max()

        status = None

        if max_sim >= THRESHOLD:
            if best_step == current_step:
                # Step detected in order
                status = f"✅ Step {best_step+1} detected"
                detected_steps.append(best_step)
                current_step += 1 if current_step < NUM_STEPS-1 else 0
                missed_flag = False
            elif best_step > current_step:
                # Step skipped
                if not missed_flag:
                    status = f"❌ Missed Step {current_step+1}, Detected Step {best_step+1}"
                    beep()
                    detected_steps.append(best_step)
                    current_step = best_step + 1 if best_step < NUM_STEPS-1 else 0
                    missed_flag = True
            else:
                # Step behind / holding, ignore
                status = f"✅ Step {best_step+1} detected"

            # Only print new status
            if status:
                color = (0,255,0) if "✅" in status else (0,0,255)
                cv2.putText(frame, status, (50,50), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
                print(f"[DEBUG] {status}, Similarity: {max_sim:.3f}")

    cv2.imshow("Live", frame)
    if cv2.waitKey(1) & 0xFF == 27:  # ESC
        break

cap.release()
cv2.destroyAllWindows()


[DEBUG] ❌ Missed Step 1, Detected Step 2, Similarity: 0.790
[DEBUG] ✅ Step 2 detected, Similarity: 0.794
[DEBUG] ✅ Step 2 detected, Similarity: 0.800
[DEBUG] ✅ Step 1 detected, Similarity: 0.816
[DEBUG] ✅ Step 2 detected, Similarity: 0.837
[DEBUG] ✅ Step 2 detected, Similarity: 0.844
[DEBUG] ✅ Step 2 detected, Similarity: 0.849
[DEBUG] ✅ Step 3 detected, Similarity: 0.832
[DEBUG] ✅ Step 3 detected, Similarity: 0.830
[DEBUG] ❌ Missed Step 4, Detected Step 5, Similarity: 0.844


In [23]:
# ---------------- Live Loop ----------------
cap = cv2.VideoCapture(0)
frame_buffer = []
frame_count = 0
current_step = 0  # index of current step (0..NUM_STEPS-1)
detected_steps = []
missed_flag = False  # to avoid repeating missed step messages

while True:
    ret, frame = cap.read()
    if not ret:
        break

    frame_count += 1
    if frame_count % FRAME_SKIP != 0:
        cv2.imshow("Live", frame)
        if cv2.waitKey(1) & 0xFF == 27:
            break
        continue

    # Preprocess frame
    img = transform(frame).unsqueeze(0).to(device)

    # CNN feature extraction
    with torch.no_grad():
        cnn_feat = cnn_model(img).squeeze().cpu().numpy()
    frame_buffer.append(cnn_feat)

    # Keep only last SEQ_LEN frames
    if len(frame_buffer) > SEQ_LEN:
        frame_buffer.pop(0)

    if len(frame_buffer) == SEQ_LEN:
        seq_input = torch.tensor(np.array(frame_buffer), dtype=torch.float32).unsqueeze(0).to(device)
        with torch.no_grad():
            live_feat = lstm_model(seq_input).cpu().numpy().reshape(-1)

        # Cosine similarity with reference steps
        sims = cosine_similarity(live_feat.reshape(1,-1), reference_features)
        best_step = sims.argmax()
        max_sim = sims.max()

        status = None

        if max_sim >= THRESHOLD:
            if best_step == current_step:
                # Step detected in order
                status = f"✅ Step {best_step+1} detected"
                detected_steps.append(best_step)
                current_step += 1 if current_step < NUM_STEPS-1 else 0
                missed_flag = False
            elif best_step > current_step:
                # Step skipped
                if not missed_flag:
                    status = f"❌ Missed Step {current_step+1}, Detected Step {best_step+1}"
                    beep()
                    detected_steps.append(best_step)
                    current_step = best_step + 1 if best_step < NUM_STEPS-1 else 0
                    missed_flag = True
            else:
                # Step behind / holding, ignore
                status = f"✅ Step {best_step+1} detected"

            # Only print new status
            if status:
                color = (0,255,0) if "✅" in status else (0,0,255)
                cv2.putText(frame, status, (50,50), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
                print(f"[DEBUG] {status}, Similarity: {max_sim:.3f}")

    cv2.imshow("Live", frame)
    if cv2.waitKey(1) & 0xFF == 27:  # ESC
        break

cap.release()
cv2.destroyAllWindows()


[DEBUG] ❌ Missed Step 1, Detected Step 5, Similarity: 0.830
[DEBUG] ✅ Step 1 detected, Similarity: 0.829
[DEBUG] ❌ Missed Step 2, Detected Step 3, Similarity: 0.847
[DEBUG] ✅ Step 3 detected, Similarity: 0.846
[DEBUG] ✅ Step 2 detected, Similarity: 0.850
[DEBUG] ✅ Step 1 detected, Similarity: 0.866
[DEBUG] ✅ Step 2 detected, Similarity: 0.882
[DEBUG] ✅ Step 2 detected, Similarity: 0.865
[DEBUG] ✅ Step 2 detected, Similarity: 0.845
[DEBUG] ✅ Step 2 detected, Similarity: 0.844
[DEBUG] ✅ Step 3 detected, Similarity: 0.844
[DEBUG] ✅ Step 3 detected, Similarity: 0.860
[DEBUG] ✅ Step 3 detected, Similarity: 0.863
[DEBUG] ✅ Step 3 detected, Similarity: 0.861
[DEBUG] ✅ Step 3 detected, Similarity: 0.849
[DEBUG] ✅ Step 3 detected, Similarity: 0.850
[DEBUG] ✅ Step 3 detected, Similarity: 0.852
[DEBUG] ✅ Step 3 detected, Similarity: 0.860
[DEBUG] ✅ Step 3 detected, Similarity: 0.854
[DEBUG] ✅ Step 3 detected, Similarity: 0.853
[DEBUG] ✅ Step 4 detected, Similarity: 0.849
[DEBUG] ✅ Step 4 detected

In [78]:
from playsound import playsound

def beep():
    playsound("mixkit-facility-alarm-sound-999.wav")  # Replace with path if in different folder


In [56]:
import winsound

def beep():
    winsound.PlaySound("SystemExclamation", winsound.SND_ALIAS)


In [70]:
import os

file = "mixkit-facility-alarm-sound-999.wav"
print(os.path.exists(file))  # Should print True


True


In [88]:
import numpy as np
import sounddevice as sd

def beep():
    fs = 44100  # sample rate
    duration = 1 # seconds
    f = 1000  # Hz
    t = np.linspace(0, duration, int(fs*duration), False)
    tone = 1 * np.sin(2 * np.pi * f * t)
    sd.play(tone, fs)
    sd.wait()

# Test
for _ in range(2):
    beep()


In [101]:
import cv2
import numpy as np
import torch
from sklearn.metrics.pairwise import cosine_similarity
import threading
import sounddevice as sd

# ---------------- CONFIG ----------------
FRAME_SKIP = 5          # sample every 5 frames
SEQ_LEN = 12            # number of frames per sequence
THRESHOLD = 0.75        # cosine similarity threshold
NUM_STEPS = 4           # total steps in the sequence

# ---------------- Beep function ----------------
def beep():
    fs = 44100  # sample rate
    duration = 0.5  # seconds
    f = 1000  # Hz
    t = np.linspace(0, duration, int(fs*duration), False)
    tone = 1 * np.sin(2 * np.pi * f * t)
    sd.play(tone, fs)
    sd.wait()

# ---------------- Live Loop ----------------
cap = cv2.VideoCapture(0)
frame_buffer = []
frame_count = 0
current_step = 0  # index of current step (0..NUM_STEPS-1)
detected_steps = []
missed_flag = False  # to avoid repeating missed step messages

while True:
    ret, frame = cap.read()
    if not ret:
        break

    frame_count += 1
    if frame_count % FRAME_SKIP != 0:
        cv2.imshow("Live", frame)
        if cv2.waitKey(1) & 0xFF == 27:
            break
        continue

    # Preprocess frame
    img = transform(frame).unsqueeze(0).to(device)

    # CNN feature extraction
    with torch.no_grad():
        cnn_feat = cnn_model(img).squeeze().cpu().numpy()
    frame_buffer.append(cnn_feat)

    # Keep only last SEQ_LEN frames
    if len(frame_buffer) > SEQ_LEN:
        frame_buffer.pop(0)

    if len(frame_buffer) == SEQ_LEN:
        seq_input = torch.tensor(np.array(frame_buffer), dtype=torch.float32).unsqueeze(0).to(device)
        with torch.no_grad():
            live_feat = lstm_model(seq_input).cpu().numpy().reshape(-1)

        # Cosine similarity with reference steps
        sims = cosine_similarity(live_feat.reshape(1,-1), reference_features)
        best_step = sims.argmax()
        max_sim = sims.max()

        status = None

        if max_sim >= THRESHOLD:
            if best_step == current_step:
                # Step detected in order
                status = f"✅ Step {best_step+1} detected"
                detected_steps.append(best_step)
                current_step += 1 if current_step < NUM_STEPS-1 else 0
                missed_flag = False
            elif best_step > current_step:
                # Step skipped
                if not missed_flag:
                    status = f"❌ Missed Step {current_step+1}, Detected Step {best_step+1}"
                    # Play beep in a separate thread
                    threading.Thread(target=beep).start()
                    detected_steps.append(best_step)
                    current_step = best_step + 1 if best_step < NUM_STEPS-1 else 0
                    missed_flag = True
            else:
                # Step behind / holding, ignore
                status = f"✅ Step {best_step+1} detected"

            # Display status
            if status:
                color = (0,255,0) if "✅" in status else (0,0,255)
                cv2.putText(frame, status, (50,50), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
                print(f"[DEBUG] {status}, Similarity: {max_sim:.3f}")

    cv2.imshow("Live", frame)
    if cv2.waitKey(1) & 0xFF == 27:  # ESC
        break

cap.release()
cv2.destroyAllWindows()


[DEBUG] ❌ Missed Step 1, Detected Step 3, Similarity: 0.821
[DEBUG] ✅ Step 3 detected, Similarity: 0.828
[DEBUG] ✅ Step 3 detected, Similarity: 0.827
[DEBUG] ✅ Step 3 detected, Similarity: 0.847
[DEBUG] ✅ Step 3 detected, Similarity: 0.864
[DEBUG] ✅ Step 3 detected, Similarity: 0.866
[DEBUG] ✅ Step 3 detected, Similarity: 0.865
[DEBUG] ✅ Step 3 detected, Similarity: 0.852
[DEBUG] ✅ Step 3 detected, Similarity: 0.860
[DEBUG] ✅ Step 3 detected, Similarity: 0.846
[DEBUG] ✅ Step 3 detected, Similarity: 0.830
[DEBUG] ✅ Step 3 detected, Similarity: 0.825
[DEBUG] ✅ Step 1 detected, Similarity: 0.795
[DEBUG] ✅ Step 1 detected, Similarity: 0.786
[DEBUG] ✅ Step 1 detected, Similarity: 0.799
[DEBUG] ✅ Step 1 detected, Similarity: 0.818
[DEBUG] ✅ Step 1 detected, Similarity: 0.838
[DEBUG] ✅ Step 1 detected, Similarity: 0.827
[DEBUG] ✅ Step 1 detected, Similarity: 0.827
[DEBUG] ✅ Step 1 detected, Similarity: 0.825
[DEBUG] ✅ Step 1 detected, Similarity: 0.824
[DEBUG] ✅ Step 1 detected, Similarity: 0