In [1]:
import tensorflow as tf
print("TensorFlow version:", tf.__version__)

gpus = tf.config.list_physical_devices('GPU')
if gpus:
    print(f"GPUs detected: {gpus}")
    print("TensorFlow is set to use the GPU.")
else:
    print("No GPU detected. TensorFlow is using the CPU.")


TensorFlow version: 2.10.0
GPUs detected: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]
TensorFlow is set to use the GPU.


In [2]:
import os
import cv2
import numpy as np
import json

def generate_numpy_arrays(images_folder, annotations_folder, output_size=(224, 224)):
    images = []
    keypoints = []

    for root, dirs, files in os.walk(images_folder):
        for file in files:
            if file.lower().endswith(('.jpg', '.png', '.jpeg')):
                image_path = os.path.join(root, file)
                relative_path = os.path.relpath(root, images_folder)
                annotation_subfolder = os.path.join(annotations_folder, relative_path)
                annotation_file = os.path.splitext(file)[0] + '.json'
                annotation_path = os.path.join(annotation_subfolder, annotation_file)

                if not os.path.exists(annotation_path):
                    continue

                # Load image
                image = cv2.imread(image_path)
                if image is None:
                    print(f"Failed to read image {image_path}")
                    continue

                original_height, original_width = image.shape[:2]

                # Load keypoints from the annotation
                with open(annotation_path, 'r') as f:
                    annotation = json.load(f)
                kp = annotation.get('keypoints')
                if kp is None:
                    continue

                # Keypoints are already in pixel coordinates
                keypoints_adjusted = [[p['x'], p['y']] for p in kp]
                
                # Print the raw keypoints before normalization

                # Normalize keypoints to [0, 1]
                keypoints_normalized = []
                for x, y in keypoints_adjusted:
                    normalized_x = x / original_width
                    normalized_y = y / original_height
                    keypoints_normalized.append(normalized_x)
                    keypoints_normalized.append(normalized_y)

                # Print the normalized keypoints

                # Resize the image
                image_resized = cv2.resize(image, output_size)
                image_normalized = image_resized / 255.0  # Normalize image to [0, 1]

                images.append(image_normalized)
                keypoints.append(keypoints_normalized)

    # Convert lists to NumPy arrays
    images_array = np.array(images)
    keypoints_array = np.array(keypoints)

    # Save arrays to .npy files
    np.save('images.npy', images_array)
    np.save('keypoints.npy', keypoints_array)

    print(f"Processed and saved {len(images_array)} images and annotations.")


generate_numpy_arrays(
    images_folder=r'C:\Users\Danie\OneDrive\Desktop\squat images',
    annotations_folder=r'C:\Users\Danie\OneDrive\Desktop\squat annotations'
)


Processed and saved 5898 images and annotations.


In [3]:
# import random
# import matplotlib.pyplot as plt
# import numpy as np
# import cv2

# def visualize_random_augmented_images(images_path='images.npy', keypoints_path='keypoints.npy', num_images=20):
#     # Load augmented images and keypoints
#     images = np.load(images_path)
#     keypoints = np.load(keypoints_path)
    
#     # Randomly select indices for the number of images to display
#     random_indices = random.sample(range(len(images)), min(num_images, len(images)))
    
#     for idx in random_indices:
#         image = images[idx]
#         image = (image * 255).astype(np.uint8)  # Convert back to original scale for visualization
        
#         # Get keypoints and convert to dictionary format
#         kp = keypoints[idx].reshape(-1, 2) * [image.shape[1], image.shape[0]]  # Scale keypoints back to pixel values
#         kp_dicts = [{'x': float(x), 'y': float(y)} for x, y in kp]

#         # Overlay connections (adapted to display keypoints and connections)
#         annotated_image = overlay_keypoints_with_connections(image, kp_dicts)
        
#         # Display the image with keypoints and connections
#         plt.figure(figsize=(8, 8))
#         plt.imshow(annotated_image)
#         plt.title(f"Random Augmented Image {idx}")
#         plt.axis('off')
#         plt.show()

# # Visualize 20 random augmented images
# visualize_random_augmented_images()


In [4]:
## NEURAL NETWORK

In [5]:
## MODEL A - TRANSFER LEARNING WITH RESNET50

In [6]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import Dense, Flatten, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from sklearn.model_selection import train_test_split


In [7]:
images = np.load('images.npy') 
keypoints = np.load('keypoints.npy')

idx = np.random.randint(len(keypoints))
print("Sample keypoints (normalized):", keypoints[idx])

print("Keypoints max value:", np.max(keypoints))
print("Keypoints min value:", np.min(keypoints))

num_keypoints = keypoints.shape[1] // 2

# Find indices where keypoints are outside [0, 1]
out_of_bounds = np.where((keypoints < 0) | (keypoints > 1))

num_outliers = len(out_of_bounds[0])
total_keypoints = keypoints.size

print(f"Number of outlier keypoints: {num_outliers} out of {total_keypoints}")


print('Number of images:', len(images))
print('Number of keypoints:', len(keypoints))

images = images / 255.0


x_train, x_val, y_train, y_val = train_test_split(images, keypoints, test_size=0.15, random_state=42)

print(f"Training set: {x_train.shape[0]}, Validation set: {x_val.shape[0]}")

n_keypoints = keypoints.shape[1] // 2

input_shape = (224, 224, 3)
base_model = ResNet50(input_shape=input_shape, include_top=False, weights='imagenet')

# Corrected the variable name here
base_model.trainable = False

x = base_model.output
x = Flatten()(x)
x = Dense(512, activation='relu')(x)
x = Dropout(0.5)(x)
x = Dense(256, activation='relu')(x)  # Corrected line
x = Dropout(0.25)(x)
output = Dense(n_keypoints * 2, activation='sigmoid')(x)

model_resnet = Model(inputs=base_model.input, outputs=output)

model_resnet.compile(optimizer='adam', loss='mse', metrics=['mae'])

model_resnet.summary()





Sample keypoints (normalized): [0.48378137 0.14072341 0.49174419 0.11161    0.50158119 0.10772097
 0.51091993 0.10384429 0.47836557 0.11587301 0.47804078 0.11548433
 0.47741383 0.11483686 0.58118469 0.08909941 0.52650893 0.10932118
 0.53161311 0.15574065 0.51332408 0.16140646 0.81805676 0.2014389
 0.59247178 0.2606276  0.86818999 0.40721539 0.61746502 0.42460883
 0.66437978 0.557356   0.54078287 0.53571457 0.64009047 0.60777932
 0.51585567 0.57407635 0.58772087 0.58948272 0.50439167 0.56191701
 0.58740759 0.57071292 0.51965368 0.54799449 0.8346824  0.57010597
 0.67592794 0.57562333 0.75480759 0.86693436 0.60864091 0.84732884
 0.84542161 1.10519147 0.67541593 1.01435935 0.90225029 1.13267589
 0.72538859 1.04018128 0.69573927 1.18946123 0.57606608 1.14105034]
Keypoints max value: 3.274808406829834
Keypoints min value: -1.7213892936706543
Number of outlier keypoints: 8889 out of 389268
Number of images: 5898
Number of keypoints: 5898
Training set: 5013, Validation set: 885
Model: "model"


In [8]:
# Define callbacks
checkpoint_resnet = ModelCheckpoint('model_resnet.h5', monitor='val_loss', save_best_only=True, verbose=1)
early_stop_resnet = EarlyStopping(monitor='val_loss', patience=5, verbose=1)

# Train the model
history_resnet = model_resnet.fit(
    x_train, y_train,
    validation_data=(x_val, y_val),
    epochs=5,
    batch_size=8,
    callbacks=[checkpoint_resnet, early_stop_resnet]
)


InternalError: Failed copying input tensor from /job:localhost/replica:0/task:0/device:GPU:0 to /job:localhost/replica:0/task:0/device:CPU:0 in order to run TensorDataset: Dst tensor is not initialized. [Op:TensorDataset]

In [None]:
## MODEL B - CNN, NO TRANSFER LEARNING

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras.models import Sequential
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from sklearn.model_selection import train_test_split


In [None]:
# Load your preprocessed images and keypoints
# Assuming images.npy and keypoints.npy have been saved from previous steps

images = np.load('images.npy')
keypoints = np.load('keypoints.npy')

# Normalize images
images = images / 255.0

img_height, img_width = images.shape[1], images.shape[2]

# Normalize keypoints
keypoints[:, ::2] /= img_width   # Normalize x-coordinates
keypoints[:, 1::2] /= img_height  # Normalize y-coordinates

# Split the dataset into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(images, keypoints, test_size=0.2, random_state=42)

print(f"Training samples: {X_train.shape[0]}, Validation samples: {X_val.shape[0]}")




In [None]:
# Define the number of keypoints
num_keypoints = keypoints.shape[1] // 2  # Each keypoint has x and y

# Build the CNN model
model_cnn = Sequential([
    Conv2D(32, (3, 3), activation='relu', input_shape=(224, 224, 3)),
    BatchNormalization(),
    MaxPooling2D(2, 2),
    
    Conv2D(64, (3, 3), activation='relu'),
    BatchNormalization(),
    MaxPooling2D(2, 2),
    
    Conv2D(128, (3, 3), activation='relu'),
    BatchNormalization(),
    MaxPooling2D(2, 2),
    
    Flatten(),
    Dense(256, activation='relu'),
    Dropout(0.5),
    Dense(128, activation='relu'),
    Dropout(0.25),
    Dense(num_keypoints * 2, activation='sigmoid')  # Output normalized between 0 and 1
])

# Compile the model
model_cnn.compile(optimizer='adam', loss='mse', metrics=['mae'])

# Display the model summary
model_cnn.summary()


In [None]:
# Define callbacks
checkpoint_cnn = ModelCheckpoint('model_cnn.h5', monitor='val_loss', save_best_only=True, verbose=1)
early_stop_cnn = EarlyStopping(monitor='val_loss', patience=5, verbose=1)

# Train the model
history_cnn = model_cnn.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=30,
    batch_size=8,
    callbacks=[checkpoint_cnn, early_stop_cnn]
)


In [None]:
## EVALUATE NEURAL NETWORKS

In [None]:
# Load the best models
model_resnet.load_weights('model_resnet.h5')
model_cnn.load_weights('model_cnn.h5')

# Evaluate on validation set
loss_resnet, mae_resnet = model_resnet.evaluate(X_val, y_val, batch_size=1)
loss_cnn, mae_cnn = model_cnn.evaluate(X_val, y_val, batch_size=1)

print(f"ResNet50 Model - Validation Loss: {loss_resnet}, MAE: {mae_resnet}")
print(f"Simple CNN Model - Validation Loss: {loss_cnn}, MAE: {mae_cnn}")


In [None]:
import matplotlib.pyplot as plt

def visualize_predictions(model, X_data, y_data, index=0):
    image = X_data[index]
    true_keypoints = y_data[index]
    predicted_keypoints = model.predict(np.expand_dims(image, axis=0))[0]
    
    h, w, _ = image.shape
    true_keypoints = true_keypoints.reshape(-1, 2) * [w, h]
    predicted_keypoints = predicted_keypoints.reshape(-1, 2) * [w, h]
    
    plt.figure(figsize=(6, 6))
    plt.imshow(image)
    # Plot true keypoints
    plt.scatter(true_keypoints[:, 0], true_keypoints[:, 1], c='g', label='True Keypoints')
    # Plot predicted keypoints
    plt.scatter(predicted_keypoints[:, 0], predicted_keypoints[:, 1], c='r', label='Predicted Keypoints')
    plt.legend()
    plt.title('Keypoint Prediction')
    plt.axis('off')
    plt.show()

# Visualize predictions for both models
print("ResNet50 Model Predictions:")
visualize_predictions(model_resnet, X_val, y_val, index=0)

print("Simple CNN Model Predictions:")
visualize_predictions(model_cnn, X_val, y_val, index=0)


In [None]:
## TEST NN WITH FRESH DATA

In [None]:
from keras.models import load_model
def preprocess_frame(frame, target_size=(224, 224)):
    """
    Preprocess a single frame for model prediction.

    Args:
        frame (numpy.ndarray): Original frame image.
        target_size (tuple): Desired image size (width, height).

    Returns:
        numpy.ndarray: Preprocessed frame.
    """
    # Resize the frame
    frame_resized = cv2.resize(frame, target_size)
    # Convert BGR to RGB
    frame_rgb = cv2.cvtColor(frame_resized, cv2.COLOR_BGR2RGB)
    # Normalize pixel values
    frame_normalized = frame_rgb / 255.0
    # Expand dimensions to match model's input shape
    frame_expanded = np.expand_dims(frame_normalized, axis=0)
    return frame_expanded

def denormalize_keypoints(predictions, img_width, img_height):
    """
    Denormalize keypoints from [0, 1] to pixel coordinates.

    Args:
        predictions (numpy.ndarray): Predicted keypoints.
        img_width (int): Width of the image.
        img_height (int): Height of the image.

    Returns:
        list of tuples: Denormalized (x, y) coordinates.
    """
    denorm_keypoints = []
    for i in range(0, len(predictions), 4):
        x = predictions[i] * img_width
        y = predictions[i+1] * img_height
        # Optionally, you can handle z and visibility if needed
        denorm_keypoints.append((x, y))
    return denorm_keypoints

def visualize_keypoints(image, keypoints, save_path=None):
    """
    Visualize keypoints on an image.

    Args:
        image (numpy.ndarray): Original image.
        keypoints (list of tuples): List of (x, y) coordinates.
        save_path (str, optional): Path to save the visualized image. Defaults to None.
    """
    plt.figure(figsize=(8, 8))
    plt.imshow(image)
    x_coords, y_coords = zip(*keypoints)
    plt.scatter(x_coords, y_coords, c='r', s=40, marker='o')
    plt.axis('off')
    if save_path:
        plt.savefig(save_path, bbox_inches='tight')
    plt.show()


# Paths
video_path = r""  # Replace with your video path
frames_folder = r"C:\Users\Danie\OneDrive\Desktop\squat images\prediction"   # Replace with your desired frames folder

# Extract frames
extract_frames_from_video(video_path, frames_folder, frame_rate=5)

model_path = 'model_cnn.h5'  # Ensure this path is correct

# Load the model
model_b = load_model(model_path)
print("Model B loaded successfully.")

# Get list of frame file paths
frame_files = sorted([os.path.join(frames_folder, f) for f in os.listdir(frames_folder) if f.endswith(('.jpg', '.png', '.jpeg'))])

# Directory to save visualized frames
visualized_frames_folder = r"C:\path\to\save\visualized_frames"  # Replace with your desired folder
if not os.path.exists(visualized_frames_folder):
    os.makedirs(visualized_frames_folder)

for frame_file in frame_files:
    # Read the original frame
    original_frame = cv2.imread(frame_file)
    if original_frame is None:
        print(f"Failed to read {frame_file}. Skipping.")
        continue
    
    # Preprocess the frame
    preprocessed_frame = preprocess_frame(original_frame, target_size=(224, 224))
    
    # Predict keypoints
    predictions = model_b.predict(preprocessed_frame)[0]  # Assuming model outputs a flat array
    
    # Denormalize keypoints
    img_height, img_width = original_frame.shape[:2]
    predicted_keypoints = denormalize_keypoints(predictions, img_width, img_height)
    
    # Visualize keypoints on the original frame
    visualize_keypoints(original_frame, predicted_keypoints)
    
    # Optionally, save the visualized frame
    frame_filename = os.path.basename(frame_file)
    save_path = os.path.join(visualized_frames_folder, f"visualized_{frame_filename}")
    visualize_keypoints(original_frame, predicted_keypoints, save_path=save_path)
    
    # Optional: Display progress
    print(f"Processed and visualized {frame_filename}.")

def compile_frames_to_video(frames_folder, output_video_path, fps=30):
    """
    Compile extracted frames into a video.

    Args:
        frames_folder (str): Directory containing visualized frames.
        output_video_path (str): Path to save the output video.
        fps (int, optional): Frames per second. Defaults to 30.
    """
    frame_files = sorted([os.path.join(frames_folder, f) for f in os.listdir(frames_folder) if f.startswith('visualized_') and f.endswith(('.jpg', '.png', '.jpeg'))])
    
    if not frame_files:
        print("No frames found to compile.")
        return
    
    # Read the first frame to get the frame size
    first_frame = cv2.imread(frame_files[0])
    height, width, layers = first_frame.shape
    
    # Define the codec and create VideoWriter object
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # You can use other codecs as needed
    video = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
    
    for frame_file in frame_files:
        frame = cv2.imread(frame_file)
        if frame is not None:
            video.write(frame)
        else:
            print(f"Failed to read {frame_file}. Skipping.")
    
    video.release()
    print(f"Video saved to {output_video_path}.")

# Paths
output_video_path = r"C:\path\to\save\output_video.mp4"  # Replace with your desired output path

# Compile video
compile_frames_to_video(visualized_frames_folder, output_video_path, fps=30)






