# LSTM Model for Human Detection and Tracking

In [1]:
import os
import cv2
import numpy as np
import json
import matplotlib.pyplot as plt
from keras.models import Sequential
from keras.layers import LSTM, Dense, Dropout, TimeDistributed, Conv2D, MaxPooling2D, Flatten
from keras.applications import VGG16
from keras.applications.vgg16 import preprocess_input
from sklearn.model_selection import train_test_split

In [2]:
# %% Paths Setup
base_dir = os.path.abspath('dataset/personpath22')
video_path = os.path.join(base_dir, 'raw_data')
annotation_dir = os.path.join(base_dir, 'annotation')
processed_data_path = os.path.join(base_dir, 'hybrid_processed_data.npz')

# Annotation Files
amodal_file = os.path.join(annotation_dir, 'anno_amodal_2022.json')
visible_file = os.path.join(annotation_dir, 'anno_visible_2022.json')
splits_file = os.path.join(annotation_dir, 'splits.json')

In [3]:
# %% Load Video Files
video_files = [f for f in os.listdir(video_path) if f.endswith('.mp4')]
print('Available video files:', video_files)

# Load splits.json to define train and test sets
with open(splits_file, 'r') as f:
    splits = json.load(f)
train_videos = splits.get('train', [])
test_videos = splits.get('test', [])

print('Train videos:', train_videos)
print('Test videos:', test_videos)

# Load Annotations
def load_annotations():
    """Load annotations from amodal and visible files."""
    print("Loading annotations...")
    with open(amodal_file, 'r') as f:
        amodal_data = json.load(f)
    with open(visible_file, 'r') as f:
        visible_data = json.load(f)
    print("Annotations loaded.")
    return amodal_data, visible_data

amodal_data, visible_data = load_annotations()

Available video files: ['uid_vid_00000.mp4', 'uid_vid_00001.mp4', 'uid_vid_00002.mp4', 'uid_vid_00003.mp4', 'uid_vid_00004.mp4', 'uid_vid_00005.mp4', 'uid_vid_00006.mp4', 'uid_vid_00007.mp4', 'uid_vid_00008.mp4', 'uid_vid_00009.mp4', 'uid_vid_00010.mp4', 'uid_vid_00011.mp4', 'uid_vid_00012.mp4', 'uid_vid_00013.mp4', 'uid_vid_00014.mp4', 'uid_vid_00015.mp4', 'uid_vid_00016.mp4', 'uid_vid_00017.mp4', 'uid_vid_00018.mp4', 'uid_vid_00019.mp4', 'uid_vid_00020.mp4', 'uid_vid_00021.mp4', 'uid_vid_00022.mp4', 'uid_vid_00023.mp4', 'uid_vid_00024.mp4', 'uid_vid_00025.mp4', 'uid_vid_00026.mp4', 'uid_vid_00027.mp4', 'uid_vid_00028.mp4', 'uid_vid_00029.mp4', 'uid_vid_00030.mp4', 'uid_vid_00031.mp4', 'uid_vid_00032.mp4', 'uid_vid_00033.mp4', 'uid_vid_00034.mp4', 'uid_vid_00035.mp4', 'uid_vid_00036.mp4', 'uid_vid_00037.mp4', 'uid_vid_00038.mp4', 'uid_vid_00039.mp4', 'uid_vid_00040.mp4', 'uid_vid_00041.mp4', 'uid_vid_00042.mp4', 'uid_vid_00043.mp4', 'uid_vid_00044.mp4', 'uid_vid_00045.mp4', 'uid_vid_0

In [4]:
# Extract annotation data for a specific video
# %% Extract Annotation Data for Specific Videos
def get_annotations(video_name):
    """Retrieve annotation data for a specific video."""
    amodal_annos, visible_annos = [], []

    individual_amodal_file = os.path.join(annotation_dir, 'anno_amodal_2022', f'{video_name}.json')
    individual_visible_file = os.path.join(annotation_dir, 'anno_visible_2022', f'{video_name}.json')

    if os.path.exists(individual_amodal_file):
        with open(individual_amodal_file, 'r') as f:
            amodal_annos = json.load(f).get('entities', [])
    if os.path.exists(individual_visible_file):
        with open(individual_visible_file, 'r') as f:
            visible_annos = json.load(f).get('entities', [])

    return amodal_annos, visible_annos



In [5]:
# %% Initialize Pretrained VGG16 Model for Feature Extraction
cnn_model = VGG16(weights='imagenet', include_top=False, input_shape=(32, 32, 3))

def extract_features_from_frame(frame):
    """Extract CNN features from a single frame."""
    frame = preprocess_input(frame)  # Preprocess input for VGG16
    frame = np.expand_dims(frame, axis=0)  # Add batch dimension
    features = cnn_model.predict(frame)  # Extract features
    return features.reshape(-1)  # Flatten the features

In [6]:
# %% Preprocess Videos with CNN Features
def preprocess_data_with_cnn(video_file, amodal_annos, visible_annos):
    """Preprocess video frames and annotations."""
    print(f"Processing video: {video_file}...")
    cap = cv2.VideoCapture(video_file)
    frames, labels = [], []
    frame_labels = {}

    # Map frame indices to 'person' labels from annotations
    for entity in amodal_annos + visible_annos:
        frame_idx = entity['blob']['frame_idx']
        label = entity['labels'].get('reflection', 0)  # Example: Use 'reflection' as label
        frame_labels[frame_idx] = label

    frame_idx = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Resize frame to 32x32 and extract CNN features
        frame = cv2.resize(frame, (32, 32))
        features = extract_features_from_frame(frame)
        frames.append(features)

        # Assign label for the current frame
        label = frame_labels.get(frame_idx, 0)
        labels.append(label)

        frame_idx += 1

    cap.release()
    return np.array(frames), np.array(labels)

In [8]:
# %% Create Time-Series Data for LSTM
def create_dataset_with_cnn(frames, labels, time_step=10):
    """Create time-series data from CNN features."""
    X, y = [], []
    for i in range(len(frames) - time_step):
        X.append(frames[i:i + time_step])  # Shape: (time_step, features)
        y.append(labels[i + time_step])
    return np.array(X), np.array(y)


In [9]:
# %% Load Dataset Based on Split
def load_dataset(video_list):
    """Load all videos and their annotations."""
    X, y = [], []

    for video_name in video_list:
        video_file = os.path.join(video_path, video_name)
        amodal_annos, visible_annos = get_annotations(video_name)

        # Preprocess video and annotations
        frames, labels = preprocess_data_with_cnn(video_file, amodal_annos, visible_annos)

        # Create time-series data for LSTM
        X_video, y_video = create_dataset_with_cnn(frames, labels)
        X.extend(X_video)
        y.extend(y_video)

    return np.array(X), np.array(y)

# %% Load or Preprocess Data
def load_or_process_data():
    """Load processed data if available, otherwise process and save datasets."""
    if os.path.exists(processed_data_path):
        print("Loading processed data...")
        processed_data = np.load(processed_data_path)
        X_train, y_train = processed_data['X_train'], processed_data['y_train']
        X_test, y_test = processed_data['X_test'], processed_data['y_test']
    else:
        print("Processing training and testing datasets...")
        X_train, y_train = load_dataset(train_videos)
        X_test, y_test = load_dataset(test_videos)

        # Save processed data to disk
        np.savez(processed_data_path, X_train=X_train, y_train=y_train, 
                 X_test=X_test, y_test=y_test)
        print(f"Processed data saved to {processed_data_path}")

    return X_train, y_train, X_test, y_test

# Load or process the data
X_train, y_train, X_test, y_test = load_or_process_data()

# Display the shapes of the datasets
print(f'Train Data Shape: X={X_train.shape}, y={y_train.shape}')
print(f'Test Data Shape: X={X_test.shape}, y={y_test.shape}')


Loading processed data...
Train Data Shape: X=(89967, 10, 3072), y=(89967,)
Test Data Shape: X=(61736, 10, 3072), y=(61736,)


In [10]:
# %% Build CNN + LSTM Model
model = Sequential()
model.add(TimeDistributed(Conv2D(32, (3, 3), activation='relu'), input_shape=(10, 32, 32, 3)))
model.add(TimeDistributed(MaxPooling2D((2, 2))))
model.add(TimeDistributed(Flatten()))
model.add(LSTM(50, return_sequences=False))
model.add(Dropout(0.2))
model.add(Dense(1, activation='sigmoid'))

model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
model.summary()


  super().__init__(**kwargs)


In [11]:
# %% Train the Model
history = model.fit(X_train, y_train, batch_size=32, epochs=10, validation_split=0.2)

# Plot Training and Validation Performance
plt.figure(figsize=(12, 6))

# Accuracy Plot
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

# Loss Plot
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.show()

Epoch 1/10


ValueError: Exception encountered when calling Sequential.call().

[1mInvalid input shape for input Tensor("sequential_1/Cast:0", shape=(None, 10, 3072), dtype=float32). Expected shape (None, 10, 32, 32, 3), but input has incompatible shape (None, 10, 3072)[0m

Arguments received by Sequential.call():
  • inputs=tf.Tensor(shape=(None, 10, 3072), dtype=uint8)
  • training=True
  • mask=None

In [None]:
# %% Save the Model
model_json = model.to_json()
with open("hybrid.json", "w") as json_file:
    json_file.write(model_json)
model.save("hybrid.h5")

print("Model saved to 'hybrid.h5'")
