#Combining CNN and LSTM (RNN) to Detect Falls From Video Clips

In [None]:
from google.colab import drive
# drive.flush_and_unmount()  # Unmount the drive

In [None]:
drive.mount('/content/drive')

In [None]:
import os, numpy as np, cv2, random, shutil
os.chdir("drive/My Drive/Fall_Detection_Playground_Senior_Prjct_II")

In [None]:
!ls

##Checking Class Imbalances and Video Analysis

In [None]:
!ls videos_cropped/fall | wc -l
!ls videos_cropped/no_fall | wc -l

In [None]:
fall_ratio = 30/70
no_fall_ratio = 40/70
print("fall class ratio:", round(30/70, 2))
print("no_fall class ratio:", round(40/70, 2))
print("overall_imbalance_ratio: 1:" + str(round(no_fall_ratio/fall_ratio, 2)))

In [None]:
fall_folder = "videos_cropped/fall/"
no_fall_folder = "videos_cropped/no_fall/"

fall_videos = os.listdir(fall_folder)
no_fall_videos= os.listdir(no_fall_folder)

for idx in range(len(fall_videos)):
  fall_videos[idx] = fall_folder + fall_videos[idx]

for idx in range(len(no_fall_videos)):
  no_fall_videos[idx] = no_fall_folder + no_fall_videos[idx]

print("Fall videos:", fall_videos)
print("No-Fall videos:", no_fall_videos)

In [None]:
!pip install opencv-python

In [None]:
def with_opencv(filename):
    video = cv2.VideoCapture(filename)
    if not video.isOpened():
        print(f"Error opening video file: {filename}")
        return None

    # Get the frame rate
    fps = video.get(cv2.CAP_PROP_FPS)

    # Get the total number of frames
    frame_count = video.get(cv2.CAP_PROP_FRAME_COUNT)

    # Calculate duration in seconds
    duration = frame_count / fps if fps > 0 else 0

    video.release()
    return duration, fps


In [None]:
def video_analysis(fall_videos, no_fall_videos):

  min_fall = float('inf')
  max_fall = 0
  total_fall = 0
  fall_fps = set()

  min_no_fall = float('inf')
  max_no_fall = 0
  total_no_fall = 0
  no_fall_fps = set()

  for video in fall_videos:
    if os.path.exists(video):
      vid_info = with_opencv(video)
      vid_length = vid_info[0]
      fall_fps.add(vid_info[1])
      min_fall = min(min_fall, vid_length)
      max_fall = max(max_fall, vid_length)
      total_fall += vid_length
    else:
      print(f"File Not Found: {video}")

  for video in no_fall_videos:
    if os.path.exists(video):
      vid_info = with_opencv(video)
      vid_length = vid_info[0]
      no_fall_fps.add(vid_info[1])
      min_no_fall = min(min_no_fall, vid_length)
      max_no_fall = max(max_no_fall, vid_length)
      total_no_fall += vid_length
    else:
      print(f"File Not Found: {video}")

  print("Shortest fall video length:",  round(min_fall, 2))
  print("Longest fall video length:",  round(max_fall, 2))
  print("Average fall video length:",  round(total_fall/len(fall_videos), 2))
  print("Fps Consistency :", len(fall_fps)==1, ",", list(fall_fps)[0])
  print("\n")
  print("Shortest no_fall video length:",  round(min_no_fall, 2))
  print("Longest no_fall video length:",  round(max_no_fall, 2))
  print("Average no_fall video length:",  round(total_no_fall/len(no_fall_videos), 2))
  print("Fps Consistency :", len(no_fall_fps)==1, ",", list(no_fall_fps)[0])


In [None]:
video_analysis(fall_videos, no_fall_videos)

##Train-Test-Validation Split

In [None]:
# Seed for reproducibility
random.seed(1)

# Function to split videos into the three sets
def split_videos(video_list, train_size, val_size, test_size):
    random.shuffle(video_list)  # Shuffle the videos to ensure random split

    # Split based on the sizes
    train = video_list[:train_size]
    val = video_list[train_size:train_size + val_size]
    test = video_list[train_size + val_size:train_size + val_size + test_size]

    return train, val, test

# Get the video filenames
fall_videos = os.listdir(fall_folder)
no_fall_videos = os.listdir(no_fall_folder)

# Split the videos (for fall and no_fall separately)
fall_train, fall_val, fall_test = split_videos(fall_videos, train_size=15, val_size=5, test_size=10)
no_fall_train, no_fall_val, no_fall_test = split_videos(no_fall_videos, train_size=20, val_size=5, test_size=10)

# Check the splits
print("Fall train:", len(fall_train))
print("Fall val:", len(fall_val))
print("Fall test:", len(fall_test))

print("No-fall train:", len(no_fall_train))
print("No-fall val:", len(no_fall_val))
print("No-fall test:", len(no_fall_test))


In [None]:
# New base folder for final splits
base_folder = "/content/drive/MyDrive/Fall_Detection_Playground_Senior_Prjct_II/final_splits/"

# Create directories for the splits
train_fall_folder = os.path.join(base_folder, "train/fall/")
val_fall_folder = os.path.join(base_folder, "val/fall/")
test_fall_folder = os.path.join(base_folder, "test/fall/")

train_no_fall_folder = os.path.join(base_folder, "train/no_fall/")
val_no_fall_folder = os.path.join(base_folder, "val/no_fall/")
test_no_fall_folder = os.path.join(base_folder, "test/no_fall/")

# Create the folders if they don't exist
os.makedirs(train_fall_folder, exist_ok=True)
os.makedirs(val_fall_folder, exist_ok=True)
os.makedirs(test_fall_folder, exist_ok=True)

os.makedirs(train_no_fall_folder, exist_ok=True)
os.makedirs(val_no_fall_folder, exist_ok=True)
os.makedirs(test_no_fall_folder, exist_ok=True)

print("New folder structure created!")

In [None]:
# Finally, move the videos into their appropriate folders
def move_files(video_list, src_folder, dst_folder):
    for video in video_list:
        shutil.copy(os.path.join(src_folder, video), os.path.join(dst_folder, video))

# Move the fall videos
move_files(fall_train, fall_folder, train_fall_folder)
move_files(fall_val, fall_folder, val_fall_folder)
move_files(fall_test, fall_folder, test_fall_folder)

# Move the no_fall videos
move_files(no_fall_train, no_fall_folder, train_no_fall_folder)
move_files(no_fall_val, no_fall_folder, val_no_fall_folder)
move_files(no_fall_test, no_fall_folder, test_no_fall_folder)

print("Videos have been moved to the respective folders!")

In [None]:
# Verifying the split
print("Train Fall Videos:", len(os.listdir(train_fall_folder)))
print("Val Fall Videos:", len(os.listdir(val_fall_folder)))
print("Test Fall Videos:", len(os.listdir(test_fall_folder)))

print("Train No-Fall Videos:", len(os.listdir(train_no_fall_folder)))
print("Val No-Fall Videos:", len(os.listdir(val_no_fall_folder)))
print("Test No-Fall Videos:", len(os.listdir(test_no_fall_folder)))

##Augmenting Videos & Balancing the Two Classes

#####We only have 15 and 20 videos for fall and no_fall classes in the training set, all of them being filmed in very similar environments with similar lighting, brightness, etc. Thus, we will be creating additional video clips for both of them with augmentations. We are also ensuring that both classes have the similar number of clips.

In [None]:
!pip install albumentations

In [None]:
def rotate_video(video_frames, angle):
    """
    Rotates the entire video by a fixed angle.
    """
    rows, cols, _ = video_frames[0].shape
    M = cv2.getRotationMatrix2D((cols / 2, rows / 2), angle, 1)
    rotated_frames = [cv2.warpAffine(frame, M, (cols, rows)) for frame in video_frames]
    return rotated_frames

def flip_video(video_frames):
    """
    Flips the entire video horizontally.
    """
    return [cv2.flip(frame, 1) for frame in video_frames]  # Horizontal flip

def adjust_brightness(video_frames):
    factor = random.uniform(0.2, 2.0)  # Broader range to change brightness
    adjusted_frames = [cv2.convertScaleAbs(frame, alpha=factor, beta=0) for frame in video_frames]
    return adjusted_frames

def adjust_contrast(video_frames):
    factor = random.uniform(0.5, 3.0)  # Increase contrast range for stronger effect
    adjusted_frames = [cv2.convertScaleAbs(frame, alpha=factor, beta=0) for frame in video_frames]
    return adjusted_frames

def adjust_saturation(video_frames):
    adjusted_frames = []
    saturation_factor = random.uniform(0.2, 2.0)  # Wider range for saturation
    for frame in video_frames:
        hsv_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
        hsv_frame[..., 1] = hsv_frame[..., 1] * saturation_factor
        hsv_frame[..., 1] = np.clip(hsv_frame[..., 1], 0, 255)
        adjusted_frame = cv2.cvtColor(hsv_frame, cv2.COLOR_HSV2BGR)
        adjusted_frames.append(adjusted_frame)
    return adjusted_frames

def change_speed(video_frames, speed_factor=1.0):
    total_frames = len(video_frames)
    new_frame_count = int(total_frames * speed_factor)

    if speed_factor < 0.5:
        # Slowing down dramatically (take every nth frame)
        step = max(1, int(1 / speed_factor))
        new_frames = [video_frames[i] for i in range(0, total_frames, step)][:new_frame_count]
    elif speed_factor > 1.5:
        # Speeding up dramatically (sample frames more aggressively)
        indices = np.linspace(0, total_frames - 1, new_frame_count, dtype=int)
        new_frames = [video_frames[i] for i in indices]
    else:
        new_frames = video_frames

    return new_frames

def adjust_hue(video_frames):
    """
    Adjusts the hue of the entire video by a random factor.
    """
    # Convert each frame to HSV and modify the hue channel
    adjusted_frames = []
    hue_shift = random.randint(-10, 10)  # Random hue shift
    for frame in video_frames:
        hsv_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
        hsv_frame[..., 0] = hsv_frame[..., 0] + hue_shift
        hsv_frame[..., 0] = np.clip(hsv_frame[..., 0], 0, 179)  # Ensure hue stays in valid range
        adjusted_frame = cv2.cvtColor(hsv_frame, cv2.COLOR_HSV2BGR)
        adjusted_frames.append(adjusted_frame)
    return adjusted_frames

def crop_video(video_frames, crop_ratio=0.8):
    """
    Crops the video frames by a random crop ratio (default: 80%).
    """
    height, width, _ = video_frames[0].shape
    crop_width = int(width * crop_ratio)
    crop_height = int(height * crop_ratio)

    # Randomly select crop position (center crop for simplicity)
    start_x = random.randint(0, width - crop_width)
    start_y = random.randint(0, height - crop_height)

    cropped_frames = [frame[start_y:start_y + crop_height, start_x:start_x + crop_width] for frame in video_frames]
    return cropped_frames

In [None]:
def interpolate_frames(video_frames, target_length):
    """
    This function interpolates between the last few frames to create a smooth transition.
    It will generate intermediate frames if the video is too short after augmentation.
    """
    total_frames = len(video_frames)
    if total_frames >= target_length:
        return video_frames[:target_length]

    padding_needed = target_length - total_frames
    interpolated_frames = video_frames.copy()

    # Interpolate between the last 2 frames to generate intermediate frames
    for i in range(padding_needed):
        t = (i + 1) / (padding_needed + 1)  # Calculate interpolation factor
        frame1 = video_frames[-2]  # Second last frame
        frame2 = video_frames[-1]  # Last frame

        # Perform linear interpolation for each channel
        interpolated_frame = cv2.addWeighted(frame1, 1 - t, frame2, t, 0)
        interpolated_frames.append(interpolated_frame)

    return interpolated_frames


def apply_fixed_temporal_length(video_frames, target_length=120, original_fps=30, target_fps=12):
    """
    This function ensures that the video frames have a fixed temporal length by resizing the video frames.
    Downsamples the video to target_fps if necessary.
    """
    total_frames = len(video_frames)

    # If the video has more frames than target_length, downsample to the target length
    if total_frames > target_length:
        # Calculate the frame indices to sample based on target FPS
        indices = np.linspace(0, total_frames - 1, target_length).astype(int)
        video_frames = [video_frames[i] for i in indices]
    elif total_frames < target_length:
        # If the video has fewer frames than target, interpolate
        video_frames = interpolate_frames(video_frames, target_length)

    return video_frames

In [None]:
def apply_augmentation_to_video(video_frames, target_length=120):
    """
    This function applies a random combination of 2 to 5 augmentations to the entire video.
    """
    augmentations = ['rotate', 'flip', 'brightness', 'contrast', 'saturation', 'hue', 'crop', 'speed']

    # Randomly select a number of augmentations (2 to 5)
    num_augmentations = random.randint(2, 5)
    selected_augmentations = random.sample(augmentations, num_augmentations)

    # Apply the selected augmentations
    for augmentation in selected_augmentations:
        if augmentation == 'rotate':
            angle = random.randint(-30, 30)
            video_frames = rotate_video(video_frames, angle)
        elif augmentation == 'flip':
            video_frames = flip_video(video_frames)
        elif augmentation == 'brightness':
            video_frames = adjust_brightness(video_frames)
        elif augmentation == 'contrast':
            video_frames = adjust_contrast(video_frames)
        elif augmentation == 'saturation':
            video_frames = adjust_saturation(video_frames)
        elif augmentation == 'hue':
            video_frames = adjust_hue(video_frames)
        elif augmentation == 'crop':
            video_frames = crop_video(video_frames, crop_ratio=random.uniform(0.7, 1.0))  # Random crop ratio between 70% to 100%
        elif augmentation == 'speed':
            speed_factor = random.uniform(0.5, 1.5)  # Speed change between 50% and 150%
            video_frames = change_speed(video_frames, speed_factor)

    # Apply fixed temporal length (truncate or pad)
    video_frames = apply_fixed_temporal_length(video_frames, target_length)

    return video_frames

In [None]:
def augment_video(input_video_path, output_video_path, target_fps=12, target_length=120):
    """
    Augments a video by applying a random combination of augmentations to the entire video.
    Ensures the final video is exactly target_length frames long at target_fps.
    """
    # Open the input video
    cap = cv2.VideoCapture(input_video_path)

    # Get video properties
    original_fps = cap.get(cv2.CAP_PROP_FPS)
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    frames = []
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        frames.append(frame)

    cap.release()

    # Apply random augmentations
    augmented_frames = apply_augmentation_to_video(frames)

    # Ensure the video has the fixed temporal length (120 frames at 12 fps)
    augmented_frames = apply_fixed_temporal_length(augmented_frames, target_length, original_fps, target_fps)

    # Ensure the output dimensions match the input dimensions
    augmented_frames = [cv2.resize(frame, (frame_width, frame_height)) for frame in augmented_frames]

    # Write the augmented video to the output file
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, target_fps, (frame_width, frame_height))

    for frame in augmented_frames:
        out.write(frame)

    out.release()
    print(f"Augmented video saved to {output_video_path}")

In [None]:
# Path to the train folder
train_folder = "/content/drive/MyDrive/Fall_Detection_Playground_Senior_Prjct_II/final_splits/train/"

# Subdirectories for each class
fall_folder = os.path.join(train_folder, 'fall')
no_fall_folder = os.path.join(train_folder, 'no_fall')

# Get all video files from both classes (fall and no_fall)
fall_videos = [os.path.join(fall_folder, video) for video in os.listdir(fall_folder) if video.endswith('.mp4')]
no_fall_videos = [os.path.join(no_fall_folder, video) for video in os.listdir(no_fall_folder) if video.endswith('.mp4')]

# Combine the lists for easy processing
all_videos = fall_videos + no_fall_videos

# Print out the loaded video paths to check
print("Fall Videos:")
for video in fall_videos:
    print(video)

print("\nNo Fall Videos:")
for video in no_fall_videos:
    print(video)


In [None]:
video_analysis(fall_videos, no_fall_videos)

In [None]:
# Number of augmented videos we want to create
num_augmented_videos = 30  # For fall videos

# Loop over the fall videos and augment them
augmented_fall_count = 0
for video_path in fall_videos:
    # Augment each video and save them with a new name
    for i in range(num_augmented_videos // len(fall_videos)):  # Divide the total augmentations across videos
        video_name = video_path.split('/')[-1].split('.')[0]
        output_video_path = f"{fall_folder}/{video_name}-augmented-{augmented_fall_count+1}.mp4"

        # Augment the video
        augment_video(video_path, output_video_path)

        augmented_fall_count += 1

print("Fall videos successfully augmented!")


In [None]:
# Number of augmented videos we want to create
num_augmented_videos = 25  # For no_fall videos

# Loop over the no_fall videos and augment them
augmented_no_fall_count = 0
for video_path in no_fall_videos:
    # Augment each video and save them with a new name
    for i in range(num_augmented_videos // len(no_fall_videos)):  # Divide the total augmentations across videos
        video_name = video_path.split('/')[-1].split('.')[0]
        output_video_path = f"{no_fall_folder}/{video_name}-augmented-{augmented_no_fall_count+1}.mp4"

        # Augment the video
        augment_video(video_path, output_video_path)

        augmented_no_fall_count += 1

print("No-fall videos successfully augmented!")

In [None]:
# Get all video files from both classes (fall and no_fall)
fall_videos = [os.path.join(fall_folder, video) for video in os.listdir(fall_folder) if video.endswith('.mp4')]
no_fall_videos = [os.path.join(no_fall_folder, video) for video in os.listdir(no_fall_folder) if video.endswith('.mp4')]

print(len(fall_videos), len(no_fall_videos))

In [None]:
# Repititive work [To fix later]: Processing the whole folder once again to ensure 120 frames (10 seconds length) in each video


def process_videos_to_fixed_length(video_paths, target_frames=120, target_fps=12):
    for video_path in video_paths:
        # Open the video
        cap = cv2.VideoCapture(video_path)

        # Get video properties
        original_fps = cap.get(cv2.CAP_PROP_FPS)
        frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

        # Extract frames from the video
        frames = []
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            frames.append(frame)

        cap.release()

        # Downsample from 30 fps to 12 fps if the original fps is 30
        if original_fps == 30:
            downsampled_frames = []
            frame_interval = int(original_fps / target_fps)  # Interval for downsampling (30/12 = 2.5, round to 2)

            for i in range(0, len(frames), frame_interval):
                downsampled_frames.append(frames[i])

            frames = downsampled_frames  # Replace frames with downsampled frames

        # Ensure the video has the target number of frames (120)
        total_frames = len(frames)

        if total_frames < target_frames:
            # If the video is shorter than 120 frames, pad by repeating the last frame
            frames.extend([frames[-1]] * (target_frames - total_frames))
        elif total_frames > target_frames:
            # If the video is longer than 120 frames, truncate
            frames = frames[:target_frames]

        # Overwrite the original video with the processed one at 12 fps
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(video_path, fourcc, target_fps, (frame_width, frame_height))  # Save at 12 FPS for final video

        for frame in frames:
            out.write(frame)

        out.release()
        print(f"Processed video: {video_path}")


In [None]:
# Paths to the train folder
train_folder = "/content/drive/MyDrive/Fall_Detection_Playground_Senior_Prjct_II/final_splits/train/"
fall_folder = os.path.join(train_folder, 'fall')
no_fall_folder = os.path.join(train_folder, 'no_fall')

# Get all video files from both classes
fall_videos = [os.path.join(fall_folder, video) for video in os.listdir(fall_folder) if video.endswith('.mp4')]
no_fall_videos = [os.path.join(no_fall_folder, video) for video in os.listdir(no_fall_folder) if video.endswith('.mp4')]

# Combine video paths for processing
all_videos = fall_videos + no_fall_videos

In [None]:
# Process all videos to ensure fixed temporal length
print("Processing all videos to fixed temporal length...")
process_videos_to_fixed_length(all_videos)

In [None]:
print(len(fall_videos), len(no_fall_videos))

In [None]:
video_analysis(fall_videos, no_fall_videos)

###Data Preprocessing

In [None]:
def extract_frames(video_path, frame_size=(160, 160), max_frames=120):
    """
    Extract frames from a video file, resize them, normalize, and ensure the total number of frames is capped.
    Repeats the last frame if the number of frames is less than max_frames.
    """
    cap = cv2.VideoCapture(video_path)
    frames = []
    frame_count = 0

    while cap.isOpened() and frame_count < max_frames:
        ret, frame = cap.read()
        if not ret:
            break  # End of video
        # Resize and normalize the frame
        frame = cv2.resize(frame, frame_size) / 255.0
        frames.append(frame)
        frame_count += 1

    cap.release()

    # If video has fewer than max_frames, repeat the last frame
    if len(frames) < max_frames:
        last_frame = frames[-1]
        frames.extend([last_frame] * (max_frames - len(frames)))  # Repeat the last frame

    # Truncate if it exceeds the max_frames limit
    return np.array(frames[:max_frames])  # Ensure it never exceeds max_frames

In [None]:
def preprocess_video_data(video_paths, batch_size, output_folder, frame_size=(160, 160), max_frames=120):
    """
    Process videos in batches, extract frames, save the processed data and labels to disk in compressed .npz format.
    """
    num_videos = len(video_paths)
    num_batches = (num_videos + batch_size - 1) // batch_size  # Ceiling division

    for batch_idx in range(num_batches):
        print(f"Processing batch {batch_idx + 1} of {num_batches}...")

        batch_videos = video_paths[batch_idx * batch_size:(batch_idx + 1) * batch_size]
        batch_data = []
        batch_labels = []

        for video_path in batch_videos:
            # Extract frames from the video
            frames = extract_frames(video_path, frame_size, max_frames)

            # Append to batch
            batch_data.append(frames)
            # Assign label (1 for 'fall', 0 for 'no_fall')
            label = 0 if 'no_fall' in video_path else 1
            batch_labels.append(label)

        # Convert batch to numpy arrays
        batch_data = np.array(batch_data, dtype=np.float32)
        batch_labels = np.array(batch_labels, dtype=np.int32)

        # Define compressed file paths
        batch_data_file = os.path.join(output_folder, f'batch_{batch_idx}_data.npz')
        batch_labels_file = os.path.join(output_folder, f'batch_{batch_idx}_labels.npz')

        # Save compressed files (using npz format for compression)
        np.savez_compressed(batch_data_file, batch_data)
        np.savez_compressed(batch_labels_file, batch_labels)

        print(f"Batch {batch_idx + 1} saved to disk.")

    print("Processing completed!")

In [None]:
# Directories for fall and no_fall videos in train, validation, and test
train_folder = "/content/drive/MyDrive/Fall_Detection_Playground_Senior_Prjct_II/final_splits/train/"
val_folder = "/content/drive/MyDrive/Fall_Detection_Playground_Senior_Prjct_II/final_splits/val/"
test_folder = "/content/drive/MyDrive/Fall_Detection_Playground_Senior_Prjct_II/final_splits/test/"

# Output folders for batches
train_output_folder = "/content/drive/MyDrive/Fall_Detection_Playground_Senior_Prjct_II/processed_batches/train/"
val_output_folder = "/content/drive/MyDrive/Fall_Detection_Playground_Senior_Prjct_II/processed_batches/val/"
test_output_folder = "/content/drive/MyDrive/Fall_Detection_Playground_Senior_Prjct_II/processed_batches/test/"

# Ensure output directories exist
os.makedirs(train_output_folder, exist_ok=True)
os.makedirs(val_output_folder, exist_ok=True)
os.makedirs(test_output_folder, exist_ok=True)

# Get all video files from each class in the train, validation, and test sets
train_fall_videos = [os.path.join(train_folder, 'fall', video) for video in os.listdir(os.path.join(train_folder, 'fall')) if video.endswith('.mp4')]
train_no_fall_videos = [os.path.join(train_folder, 'no_fall', video) for video in os.listdir(os.path.join(train_folder, 'no_fall')) if video.endswith('.mp4')]

val_fall_videos = [os.path.join(val_folder, 'fall', video) for video in os.listdir(os.path.join(val_folder, 'fall')) if video.endswith('.mp4')]
val_no_fall_videos = [os.path.join(val_folder, 'no_fall', video) for video in os.listdir(os.path.join(val_folder, 'no_fall')) if video.endswith('.mp4')]

test_fall_videos = [os.path.join(test_folder, 'fall', video) for video in os.listdir(os.path.join(test_folder, 'fall')) if video.endswith('.mp4')]
test_no_fall_videos = [os.path.join(test_folder, 'no_fall', video) for video in os.listdir(os.path.join(test_folder, 'no_fall')) if video.endswith('.mp4')]

import random

# Shuffle individual class lists
random.shuffle(train_fall_videos)
random.shuffle(train_no_fall_videos)

# Combine the shuffled class lists
train_videos = train_fall_videos + train_no_fall_videos

# Shuffle the combined list
random.shuffle(train_videos)

# Do the same for validation and test sets
random.shuffle(val_fall_videos)
random.shuffle(val_no_fall_videos)
val_videos = val_fall_videos + val_no_fall_videos
random.shuffle(val_videos)

random.shuffle(test_fall_videos)
random.shuffle(test_no_fall_videos)
test_videos = test_fall_videos + test_no_fall_videos
random.shuffle(test_videos)


# Print counts of videos
print("Number of videos:")
print(f"Train: {len(train_videos)}, Validation: {len(val_videos)}, Test: {len(test_videos)}")

In [None]:
# Define the batch size as 5 videos per batch
batch_size = 5

# Process the data for each set in batches
preprocess_video_data(train_videos, batch_size, train_output_folder)
preprocess_video_data(val_videos, batch_size, val_output_folder)
preprocess_video_data(test_videos, batch_size, test_output_folder)

###Working With Batch Data & Labels

In [None]:
def load_single_batch(data_file_path, labels_file_path):
    """
    Load a single batch from the npz data and labels files.
    :param data_file_path: Path to the batch data .npz file
    :param labels_file_path: Path to the batch labels .npz file
    :return: (batch_data, batch_labels)
    """
    batch_data = np.load(data_file_path)['arr_0']
    batch_labels = np.load(labels_file_path)['arr_0']
    return batch_data, batch_labels

In [None]:
def load_batches_one_at_a_time(batch_folder):
    """
    Load batches from the given folder one batch at a time.
    :param batch_folder: Path to the folder containing the batch files
    :return: A generator yielding a single batch of data and labels at a time
    """
    # List all npz files in the directory (for data and labels)
    data_files = [file for file in os.listdir(batch_folder) if file.endswith('_data.npz')]
    labels_files = [file for file in os.listdir(batch_folder) if file.endswith('_labels.npz')]

    # Sort the files to ensure correct batch order
    data_files.sort()
    labels_files.sort()

    assert len(data_files) == len(labels_files), "Mismatch in number of data and label files"

    # Load one batch at a time
    for i in range(len(data_files)):
        data_file_path = os.path.join(batch_folder, data_files[i])
        labels_file_path = os.path.join(batch_folder, labels_files[i])

        # Load data and labels for this batch
        batch_data, batch_labels = load_single_batch(data_file_path, labels_file_path)

        # Yield a single batch of data and labels
        yield batch_data, batch_labels

In [None]:
!pip install tensorflow
# print(tf.__version__)

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models
import tensorflow.keras.metrics
from tensorflow.keras.optimizers import Adam

In [None]:
# Learning rate decay function
def lr_decay(epoch, lr):
    # Decaying the learning rate by a factor of 0.9 every 5 epochs
    decay_factor = 0.9
    drop_every = 5  # Drop learning rate every 5 epochs
    if epoch % drop_every == 0 and epoch > 0:
        return lr * decay_factor
    return lr

In [None]:
# Create model function (same as before)
def create_model(input_shape=(120, 160, 160, 3)):
    model = models.Sequential()

    # Feature extraction with 2D convolutions
    model.add(layers.TimeDistributed(layers.Conv2D(16, (3, 3), activation='relu', padding='same'), input_shape=input_shape))
    model.add(layers.TimeDistributed(layers.MaxPooling2D((2, 2))))
    model.add(layers.TimeDistributed(layers.Conv2D(32, (3, 3), activation='relu', padding='same')))
    model.add(layers.TimeDistributed(layers.MaxPooling2D((2, 2))))

    # Flatten spatial dimensions
    model.add(layers.TimeDistributed(layers.Flatten()))

    # Temporal modeling
    model.add(layers.LSTM(64, return_sequences=False, dropout=0.3, recurrent_dropout=0.3))

    # Fully connected layers
    model.add(layers.Dense(32, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.01)))
    model.add(layers.Dropout(0.3))
    model.add(layers.Dense(1, activation='sigmoid'))  # Binary classification

    # Compile the model with Adam optimizer and a custom learning rate scheduler
    model.compile(
        optimizer=Adam(learning_rate=0.0001),  # initial learning rate
        loss='binary_crossentropy',
        metrics=['accuracy', tf.keras.metrics.Precision(), tf.keras.metrics.Recall(), tf.keras.metrics.AUC()],
    )

    return model

#  Create the model
model = create_model(input_shape=(120, 160, 160, 3))

# Summarize the model architecture
model.summary()


In [None]:
train_folder = "/content/drive/MyDrive/Fall_Detection_Playground_Senior_Prjct_II/processed_batches/train/"
val_folder = "/content/drive/MyDrive/Fall_Detection_Playground_Senior_Prjct_II/processed_batches/val/"
test_folder = "/content/drive/MyDrive/Fall_Detection_Playground_Senior_Prjct_II/processed_batches/test/"

In [None]:
# Define the training loop with learning rate scheduler callback
epochs = 10

# Setup the Learning Rate Scheduler callback
lr_scheduler = tf.keras.callbacks.LearningRateScheduler(lr_decay)

# Training loop (without validation during training)
for epoch in range(epochs):
    print(f"Epoch {epoch + 1}/{epochs}")

    # Training loop with batch generator
    train_batch_generator = load_batches_one_at_a_time(train_folder)

    for i, (batch_data, batch_labels) in enumerate(train_batch_generator):
        batch_data = tf.convert_to_tensor(batch_data)
        batch_labels = tf.convert_to_tensor(batch_labels)

        # Train on the batch
        metrics = model.train_on_batch(batch_data, batch_labels)

        # Unpack metrics dynamically
        metric_names = model.metrics_names  # Get metric names dynamically
        metrics_dict = dict(zip(metric_names, metrics))

        # Print metrics for training
        print(f"Batch {i + 1}: Training metrics: {metrics_dict}")

    # Call the learning rate scheduler at the end of each epoch
    current_lr = lr_decay(epoch, model.optimizer.lr)
    model.optimizer.lr.assign(current_lr)

print("Training completed!")


###Validation

In [None]:
from sklearn.metrics import confusion_matrix

# Validation after training
val_batch_generator = load_batches_one_at_a_time(val_folder)

val_metrics = []
for i, (val_data, val_labels) in enumerate(val_batch_generator):
    val_data = tf.convert_to_tensor(val_data)
    val_labels = tf.convert_to_tensor(val_labels)

    # Validation metrics
    val_metrics_batch = model.test_on_batch(val_data, val_labels)
    val_metrics.append(val_metrics_batch)

    y_pred = model.predict(val_data)
    cm = confusion_matrix(val_labels, y_pred > 0.5)
    print(cm)

# Calculate average validation metrics for the entire validation set
avg_val_metrics = np.mean(val_metrics, axis=0)
val_metrics_dict = dict(zip(model.metrics_names, avg_val_metrics))

print(f"Validation metrics after training: {val_metrics_dict}")


###Tesing

In [None]:
# Test the model on the test set

test_batch_generator = load_batches_one_at_a_time(test_folder)

test_metrics = []
for i, (test_data, test_labels) in enumerate(test_batch_generator):
    test_data = tf.convert_to_tensor(test_data)
    test_labels = tf.convert_to_tensor(test_labels)

    # Test metrics
    test_metrics_batch = model.test_on_batch(test_data, test_labels)
    test_metrics.append(test_metrics_batch)

    y_pred = model.predict(test_data)
    cm = confusion_matrix(test_labels, y_pred > 0.5)
    print(cm)

# Calculate average test metrics
avg_test_metrics = np.mean(test_metrics, axis=0)
test_metrics_dict = dict(zip(model.metrics_names, avg_test_metrics))

print(f"Test metrics: {test_metrics_dict}")


###Testing on External Video(s)

In [None]:
# Step 1: Process the video to 12 fps, 120 frames, and resize to 160x160
def process_video(video_path, fps=12, target_length=120, target_size=(160, 160)):
    cap = cv2.VideoCapture(video_path)

    original_fps = cap.get(cv2.CAP_PROP_FPS)
    frame_interval = int(original_fps // fps)
    selected_frames = []

    frame_count = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        if frame_count % frame_interval == 0 and len(selected_frames) < target_length:
            frame_resized = cv2.resize(frame, target_size)
            selected_frames.append(frame_resized)

        frame_count += 1

        if len(selected_frames) >= target_length:
            break

    cap.release()
    return np.array(selected_frames)

# Process the video
path = '/content/drive/MyDrive/Fall_Detection_Playground_Senior_Prjct_II/external_test_vids/'
video_path = ['fall.mp4']

for videos in video_path:
  v_path = path + videos
  processed_video = process_video(v_path)
  # Ensure the video has shape (1, 120, 160, 160, 3) as your model expects
  processed_video_tensor = tf.convert_to_tensor(processed_video[np.newaxis, ...], dtype=tf.float32)

  # Step 3: Predict using the model
  predictions = model.predict(processed_video_tensor)

  # Step 4: Convert prediction to class label (e.g., 0 for 'no_fall', 1 for 'fall')
  # predicted_class = "fall" if predictions[0][0] > 0.5 else "no_fall"
  # print(f"The video is predicted as: {predicted_class}, {predictions[0][0]}")
  # print(videos, predictions[0][0])

  # , 'test_video_1.mp4', 'test.mp4', 'test_video.mp4', 'test_video_2.mp4'


In [None]:
# Ensure the video has shape (1, 120, 160, 160, 3) as your model expects
processed_video_tensor = tf.convert_to_tensor(processed_video[np.newaxis, ...], dtype=tf.float32)

# Step 3: Predict using the model
predictions = model.predict(processed_video_tensor)

# Step 4: Convert prediction to class label (e.g., 0 for 'no_fall', 1 for 'fall')
# predicted_class = "fall" if predictions[0][0] > 0.5 else "no_fall"
# print(f"The video is predicted as: {predicted_class}, {predictions[0][0]}")
print(predictions[0][0])