In [1]:
import tensorflow as tf
import cv2
import numpy as np
import os
from official.projects.movinet.modeling import movinet
from official.projects.movinet.modeling import movinet_model

# ========== 1. 모델 설정 및 로딩 ==========
def load_movinet_model(model_id='a0', checkpoint_dir=None):
    use_positional_encoding = model_id in {'a3', 'a4', 'a5'}

    backbone = movinet.Movinet(
        model_id=model_id,
        causal=True,
        conv_type='2plus1d',
        se_type='2plus3d',
        activation='hard_swish',
        gating_activation='hard_sigmoid',
        use_positional_encoding=use_positional_encoding,
        use_external_states=True,
    )

    model = movinet_model.MovinetClassifier(
        backbone,
        num_classes=600,
        output_states=True
    )

    dummy_input = tf.ones([1, 8, 172, 172, 3])
    model.build(dummy_input.shape)

    if checkpoint_dir:
        checkpoint_path = tf.train.latest_checkpoint(checkpoint_dir)
        checkpoint = tf.train.Checkpoint(model=model)
        status = checkpoint.restore(checkpoint_path)
        status.assert_existing_objects_matched()
        print("✅ Checkpoint loaded from:", checkpoint_path)

    return model

# ========== 2. 비디오 클립 전처리 ==========
def extract_frames_from_video(video_path, clip_len=8, size=(172, 172)):
    cap = cv2.VideoCapture(video_path)
    frames = []
    while len(frames) < clip_len:
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, size)
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame = frame / 255.0
        frames.append(frame)
    cap.release()

    if len(frames) < clip_len:
        print("⚠️ Not enough frames in the video.")
        return None

    frames_np = np.stack(frames, axis=0)  # (T, H, W, C)
    return tf.convert_to_tensor([frames_np], dtype=tf.float32)  # (1, T, H, W, C)

# ========== 3. 추론 수행 (Streaming 방식) ==========
def predict_clip_streaming(model, inputs):
    frames = tf.split(inputs, inputs.shape[1], axis=1)
    states = model.init_states(tf.shape(inputs))
    predictions = []

    for frame in frames:
        output, states = model({**states, 'image': frame})
        predictions.append(output)

    final_prediction = tf.argmax(predictions[-1], axis=-1).numpy()[0]
    return final_prediction

# ========== 4. 라벨 불러오기 ==========
def load_kinetics_labels():
    import requests
    url = "https://raw.githubusercontent.com/deepmind/kinetics-i3d/master/data/label_map.txt"
    txt = requests.get(url).text
    labels = [line.split(":")[1].strip().strip('"') for line in txt.strip().splitlines()]
    return labels

# ========== 5. 실행 ==========
if __name__ == "__main__":
    video_path = r'E:\glass_git\ML-DL\vision\data\cam1.avi' # 🔁 여기에 비디오 경로 입력
    checkpoint_dir = r'E:\glass_git\ML-DL\vision\model\movinet_a5_stream\ckpt-1.data-00000-of-00001'  # 🔁 checkpoint 폴더 경로 입력

    model = load_movinet_model(model_id="a0", checkpoint_dir=checkpoint_dir)
    labels = load_kinetics_labels()

    clip = extract_frames_from_video(video_path, clip_len=8)
    if clip is not None:
        pred_idx = predict_clip_streaming(model, clip)
        pred_label = labels[pred_idx]
        print(f"🧠 Prediction: {pred_label}")
        if "fall" in pred_label.lower():
            print("🚨 FALL DETECTED")
        else:
            print("✅ No fall detected.")


ModuleNotFoundError: No module named 'tensorflow'