In [None]:
!pip install tensorflow-hub
!pip install tensorflow-addons

Collecting tensorflow-addons
  Downloading tensorflow_addons-0.23.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (611 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m611.8/611.8 kB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0m
Collecting typeguard<3.0.0,>=2.7 (from tensorflow-addons)
  Downloading typeguard-2.13.3-py3-none-any.whl (17 kB)
Installing collected packages: typeguard, tensorflow-addons
Successfully installed tensorflow-addons-0.23.0 typeguard-2.13.3


In [None]:
# -*- coding: utf-8 -*-
"""Human Activity Recognition with MoveNet"""

import os
import numpy as np
import tensorflow as tf
import tensorflow_hub as hub
from tensorflow.keras.preprocessing import image
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, LSTM, TimeDistributed

# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Set paths and parameters
data_dir = '/content/drive/MyDrive/HAR_DATASET/HRA_DATASET_K'
img_height, img_width = 64, 64  # Adjust these values according to your data
batch_size = 32
epochs = 100
num_classes = len(os.listdir(data_dir))  # Number of activity classes



Mounted at /content/drive


In [None]:

!pip install --upgrade tensorflow-hub



In [None]:
model_name = "movenet_lightning"

if "tflite" in model_name:
  if "movenet_lightning_f16" in model_name:
    !wget -q -O model.tflite https://tfhub.dev/google/lite-model/movenet/singlepose/lightning/tflite/float16/4?lite-format=tflite
    input_size = 192
  elif "movenet_thunder_f16" in model_name:
    !wget -q -O model.tflite https://tfhub.dev/google/lite-model/movenet/singlepose/thunder/tflite/float16/4?lite-format=tflite
    input_size = 256
  elif "movenet_lightning_int8" in model_name:
    !wget -q -O model.tflite https://tfhub.dev/google/lite-model/movenet/singlepose/lightning/tflite/int8/4?lite-format=tflite
    input_size = 192
  elif "movenet_thunder_int8" in model_name:
    !wget -q -O model.tflite https://tfhub.dev/google/lite-model/movenet/singlepose/thunder/tflite/int8/4?lite-format=tflite
    input_size = 256
  else:
    raise ValueError("Unsupported model name: %s" % model_name)

  # Initialize the TFLite interpreter
  interpreter = tf.lite.Interpreter(model_path="model.tflite")
  interpreter.allocate_tensors()

  def movenet(input_image):
    """Runs detection on an input image.

    Args:
      input_image: A [1, height, width, 3] tensor represents the input image
        pixels. Note that the height/width should already be resized and match the
        expected input resolution of the model before passing into this function.

    Returns:
      A [1, 1, 17, 3] float numpy array representing the predicted keypoint
      coordinates and scores.
    """
    # TF Lite format expects tensor type of uint8.
    input_image = tf.cast(input_image, dtype=tf.uint8)
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()
    interpreter.set_tensor(input_details[0]['index'], input_image.numpy())
    # Invoke inference.
    interpreter.invoke()
    # Get the model prediction.
    keypoints_with_scores = interpreter.get_tensor(output_details[0]['index'])
    return keypoints_with_scores

else:
  if "movenet_lightning" in model_name:
    module = hub.load("https://tfhub.dev/google/movenet/singlepose/lightning/4")
    input_size = 192
  elif "movenet_thunder" in model_name:
    module = hub.load("https://tfhub.dev/google/movenet/singlepose/thunder/4")
    input_size = 256
  else:
    raise ValueError("Unsupported model name: %s" % model_name)

  def movenet(input_image):
    """Runs detection on an input image.

    Args:
      input_image: A [1, height, width, 3] tensor represents the input image
        pixels. Note that the height/width should already be resized and match the
        expected input resolution of the model before passing into this function.

    Returns:
      A [1, 1, 17, 3] float numpy array representing the predicted keypoint
      coordinates and scores.
    """
    model = module.signatures['serving_default']

    # SavedModel format expects tensor type of int32.
    input_image = tf.cast(input_image, dtype=tf.int32)
    # Run model inference.
    outputs = model(input_image)
    # Output is a [1, 1, 17, 3] tensor.
    keypoints_with_scores = outputs['output_0'].numpy()
    return keypoints_with_scores

In [None]:
# Load MoveNet model
model_name = "movenet_lightning"  # or "movenet_thunder"
module = hub.load("https://tfhub.dev/google/movenet/singlepose/lightning/4".format(model_name))
input_size = 192 if "lightning" in model_name else 256

# MoveNet function
def movenet(input_image):
    """Runs detection on an input image.
    Args:
        input_image: A [1, height, width, 3] tensor represents the input image
            pixels. Note that the height/width should already be resized and match the
            expected input resolution of the model before passing into this function.
    Returns:
        A [1, 1, 17, 3] float numpy array representing the predicted keypoint
        coordinates and scores.
    """
    model = module.signatures['serving_default']
    # SavedModel format expects tensor type of int32.
    input_image = tf.cast(input_image, dtype=tf.int32)
    # Run model inference.
    outputs = model(input_image)
    # Output is a [1, 1, 17, 3] tensor.
    keypoints_with_scores = outputs['output_0'].numpy()
    return keypoints_with_scores

# Data generator function
def data_generator(data_dir, batch_size):
    while True:
        # Load and preprocess images
        X_batch = []
        y_batch = []

        # Iterate over class folders
        for class_dir in os.listdir(data_dir):
            class_path = os.path.join(data_dir, class_dir)
            if os.path.isdir(class_path):
                # Load images and labels for the current class
                for img_file in os.listdir(class_path):
                    img_path = os.path.join(class_path, img_file)
                    img = image.load_img(img_path, target_size=(img_height, img_width))
                    img_array = image.img_to_array(img)

                    # Preprocess image for MoveNet
                    img_tensor = tf.expand_dims(img_array, axis=0)
                    img_tensor = tf.image.resize_with_pad(img_tensor, input_size, input_size)

                    # Run MoveNet
                    keypoints_with_scores = movenet(img_tensor)

                    # Add processed image and label to batch
                    X_batch.append(keypoints_with_scores)
                    y_batch.append(class_dir)

                    # Break if batch is full
                    if len(X_batch) == batch_size:
                        X_batch = np.array(X_batch)
                        y_batch = np.array(y_batch)
                        yield (X_batch, y_batch)
                        X_batch = []
                        y_batch = []



In [None]:
# -*- coding: utf-8 -*-
"""Human Activity Recognition with MoveNet"""
'''
import os
import numpy as np
import tensorflow as tf
import tensorflow_hub as hub
from tensorflow.keras.preprocessing import image
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense'''

# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Set paths and parameters
data_dir = '/content/drive/MyDrive/HAR_DATASET/HRA_DATASET_K'
img_height, img_width = 64, 64  # Adjust these values according to your data
batch_size = 32
epochs = 1
num_classes = len(os.listdir(data_dir))  # Number of activity classes

# Load MoveNet model
model_name = "movenet_lightning"  # or "movenet_thunder"
module = hub.load("https://tfhub.dev/google/movenet/singlepose/lightning/4".format(model_name))
input_size = 192 if "lightning" in model_name else 256

# MoveNet function
def movenet(input_image):
    """Runs detection on an input image.
    Args:
        input_image: A [1, height, width, 3] tensor represents the input image
            pixels. Note that the height/width should already be resized and match the
            expected input resolution of the model before passing into this function.
    Returns:
        A [1, 1, 17, 3] float numpy array representing the predicted keypoint
        coordinates and scores.
    """
    model = module.signatures['serving_default']
    # SavedModel format expects tensor type of int32.
    input_image = tf.cast(input_image, dtype=tf.int32)
    # Run model inference.
    outputs = model(input_image)
    # Output is a [1, 1, 17, 3] tensor.
    keypoints_with_scores = outputs['output_0'].numpy()
    return keypoints_with_scores

# Data generator function
# Data generator function


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
def data_generator(data_dir, batch_size):
    # Create a dictionary to map class labels to integers
    class_to_int = {class_name: i for i, class_name in enumerate(os.listdir(data_dir))}

    while True:
        # Load and preprocess images
        X_batch = []
        y_batch = []

        # Iterate over class folders
        for class_dir in os.listdir(data_dir):
            class_path = os.path.join(data_dir, class_dir)
            if os.path.isdir(class_path):
                # Load images and labels for the current class
                for img_file in os.listdir(class_path):
                    img_path = os.path.join(class_path, img_file)
                    img = image.load_img(img_path, target_size=(img_height, img_width))
                    img_array = image.img_to_array(img)

                    # Preprocess image for MoveNet
                    img_tensor = tf.expand_dims(img_array, axis=0)
                    img_tensor = tf.image.resize_with_pad(img_tensor, input_size, input_size)

                    # Run MoveNet
                    keypoints_with_scores = movenet(img_tensor)

                    # Add processed image and label to batch
                    X_batch.append(keypoints_with_scores[0, 0])
                    y_batch.append(class_to_int[class_dir])  # Use integer labels

                    # Break if batch is full
                    if len(X_batch) == batch_size:
                        X_batch = np.array(X_batch)
                        y_batch = np.array(y_batch)

                        # One-hot encode integer labels
                        y_batch = tf.keras.utils.to_categorical(y_batch, num_classes=num_classes)
                        y_batch = tf.keras.utils.to_categorical(y_batch, num_classes=num_classes)[:, 0]
                        #y_batch = tf.keras.utils.to_categorical(y_batch, num_classes=num_classes)
                        yield (X_batch, y_batch)
                        X_batch = []
                        y_batch = []



In [None]:
from tensorflow.keras import layers


In [None]:
# Create CNN model
# Create CNN model
'''cnn_model = Sequential([
    Conv2D(32, (2, 2), activation='relu', padding='same', input_shape=(17, 3, 1)),
    Conv2D(64, (2, 2), activation='relu', padding='same'),
    MaxPooling2D((2, 2)),
    Flatten(),
    Dense(128, activation='relu'),
    Dense(num_classes, activation='softmax')
])

# Compile and train CNN model mean_squared_error categorical_crossentropy
optimizer = tf.keras.optimizers.Adam()
cnn_model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

data_gen = data_generator(data_dir, batch_size)
cnn_model.fit(data_gen, epochs=epochs)'''

"cnn_model = Sequential([\n    Conv2D(32, (2, 2), activation='relu', padding='same', input_shape=(17, 3, 1)),\n    Conv2D(64, (2, 2), activation='relu', padding='same'),\n    MaxPooling2D((2, 2)),\n    Flatten(),\n    Dense(128, activation='relu'),\n    Dense(num_classes, activation='softmax')\n])\n\n# Compile and train CNN model mean_squared_error categorical_crossentropy\noptimizer = tf.keras.optimizers.Adam()\ncnn_model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])\n\ndata_gen = data_generator(data_dir, batch_size)\ncnn_model.fit(data_gen, epochs=epochs)"

In [None]:
import tensorflow as tf

# Create CNN model
cnn_model = Sequential([
    Conv2D(32, (2, 2), activation='relu', padding='same', input_shape=(17, 3, 1)),
    Conv2D(64, (2, 2), activation='relu', padding='same'),
    MaxPooling2D((2, 2)),
    Flatten(),
    Dense(128, activation='relu'),
    Dense(num_classes, activation='softmax')
])

# Compile and train CNN model

optimizer = tf.keras.optimizers.Adam()
cnn_model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
data_gen = data_generator(data_dir, batch_size)

# Create a custom callback to stop training after a certain number of steps
class StopAfterNStepsCallback(tf.keras.callbacks.Callback):
    def __init__(self, max_steps):
        self.max_steps = max_steps
        self.current_step = 0

    def on_batch_end(self, batch, logs=None):
        self.current_step += 1
        if self.current_step >= self.max_steps:
            self.model.stop_training = True

# Set the maximum number of steps you want to run
max_steps =101

# Create the custom callback and pass it to the fit function
stop_callback = StopAfterNStepsCallback(max_steps)
#model.fit(..., verbose=0)  # Silent mode

cnn_model.fit(data_gen, epochs=100, callbacks=[stop_callback], verbose=0)

Epoch 1/100
     55/Unknown - 135s 2s/step - loss: 0.9568 - accuracy: 0.7443





<keras.src.callbacks.History at 0x7b6b35f22260>

In [None]:
# Load the saved model
cnn_model.save('cnn_image_model')

# You can then use loaded_model for prediction or further training


In [None]:
import tensorflow as tf

# Save the model in the SavedModel format
tf.saved_model.save(cnn_model, 'cnn_image_model')

# Convert the SavedModel directory into a single file
converter = tf.compat.v1.lite.TFLiteConverter.from_saved_model('cnn_image_model')
tflite_model = converter.convert()

# Save the converted model to a file
with open('converted_cnn_model.tflite', 'wb') as f:
    f.write(tflite_model)


Instructions for updating:
Use `tf.saved_model.load` instead.
Instructions for updating:
This API was designed for TensorFlow v1. See https://www.tensorflow.org/guide/migrate for instructions on how to migrate your code to TensorFlow v2.
Instructions for updating:
This API was designed for TensorFlow v1. See https://www.tensorflow.org/guide/migrate for instructions on how to migrate your code to TensorFlow v2.
Use '@tf.function' or '@defun' to decorate the function.
Use '@tf.function' or '@defun' to decorate the function.
Use '@tf.function' or '@defun' to decorate the function.
Use '@tf.function' or '@defun' to decorate the function.
Use '@tf.function' or '@defun' to decorate the function.
Use '@tf.function' or '@defun' to decorate the function.
Use '@tf.function' or '@defun' to decorate the function.
Use '@tf.function' or '@defun' to decorate the function.
Use '@tf.function' or '@defun' to decorate the function.
Use '@tf.function' or '@defun' to decorate the function.
Use '@tf.functio

In [None]:
def data_generator(data_dir, batch_size):
    # Your existing data generation code

    while True:
        # Load and preprocess images
        X_batch = []
        y_batch = []

        # Iterate over class folders
        for class_dir in os.listdir(data_dir):
            class_path = os.path.join(data_dir, class_dir)
            if os.path.isdir(class_path):
                # Load images and labels for the current class
                for img_file in os.listdir(class_path):
                    img_path = os.path.join(class_path, img_file)
                    img = image.load_img(img_path, target_size=(img_height, img_width), color_mode='grayscale')  # Ensure grayscale images
                    img_array = image.img_to_array(img)

                    # Preprocess image for MoveNet
                    img_tensor = tf.expand_dims(img_array, axis=0)
                    img_tensor = tf.image.resize_with_pad(img_tensor, input_size, input_size)

                    # Run MoveNet
                    keypoints_with_scores = movenet(img_tensor)

                    # Add processed image and label to batch
                    X_batch.append(keypoints_with_scores[0, 0])
                    y_batch.append(class_to_int[class_dir])  # Use integer labels

                    # Break if batch is full
                    if len(X_batch) == batch_size:
                        X_batch = np.array(X_batch)
                        # Add channel dimension to input data
                        X_batch = np.expand_dims(X_batch, axis=-1)
                        y_batch = np.array(y_batch)

                        # One-hot encode integer labels
                        y_batch = tf.keras.utils.to_categorical(y_batch, num_classes=num_classes)
                        yield (X_batch, y_batch)
                        X_batch = []
                        y_batch = []


In [None]:
# Extract a batch of data from the data generator
sample_batch = next(data_gen)

# Print the shape of the input data
X_batch, y_batch = sample_batch
print("Shape of input data (X_batch):", X_batch.shape)
print("Shape of target data (y_batch):", y_batch.shape)


Shape of input data (X_batch): (32, 17, 3)
Shape of target data (y_batch): (32, 19)


In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense

# Define the input shape
input_shape = (17, 3)  # (time_steps, features)

# Create the RNN model
model = Sequential()
model.add(LSTM(64, input_shape=input_shape, return_sequences=True))
model.add(LSTM(32, return_sequences=True))
model.add(LSTM(16))
model.add(Dense(19, activation='softmax'))  # Output layer with 19 units for classification

# Compile the model
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Print the model summary
model.summary()

Model: "sequential_23"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm_22 (LSTM)              (None, 17, 64)            17408     
                                                                 
 lstm_23 (LSTM)              (None, 17, 32)            12416     
                                                                 
 lstm_24 (LSTM)              (None, 16)                3136      
                                                                 
 dense_24 (Dense)            (None, 19)                323       
                                                                 
Total params: 33283 (130.01 KB)
Trainable params: 33283 (130.01 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [None]:
def data_generator(data_dir, batch_size):
    while True:
        X_batch = []
        y_batch = []
        # ... (your existing data generation code)

        # Check if X_batch has the correct shape
        if len(X_batch) > 0:
            X_batch = np.array(X_batch)
            # Reshape X_batch to (batch_size, time_steps, features)
            X_batch = np.reshape(X_batch, (len(X_batch), X_batch.shape[1], X_batch.shape[2]))

        y_batch = np.array(y_batch)
        y_batch = tf.keras.utils.to_categorical(y_batch, num_classes=num_classes)

        if len(X_batch) == batch_size:
            yield (X_batch, y_batch)
            X_batch = []
            y_batch = []
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense

# Define the input shape
input_shape = (17, 3)  # (time_steps, features)

# Create the RNN model
model = Sequential()
model.add(LSTM(64, input_shape=input_shape, return_sequences=True))
model.add(LSTM(32, return_sequences=True))
model.add(LSTM(16))
model.add(Dense(19, activation='softmax'))  # Output layer with 19 units for classification

# Compile the model
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Print the model summary
model.summary()

# Set the number of epochs and batch size
epochs = 100
batch_size = 32

# Initialize the data generator
data_gen = data_generator(data_dir, batch_size)

# Fit the model
model.fit(data_gen, epochs=epochs, verbose=1)

Model: "sequential_27"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm_32 (LSTM)              (None, 17, 64)            17408     
                                                                 
 lstm_33 (LSTM)              (None, 17, 32)            12416     
                                                                 
 lstm_34 (LSTM)              (None, 16)                3136      
                                                                 
 dense_28 (Dense)            (None, 19)                323       
                                                                 
Total params: 33283 (130.01 KB)
Trainable params: 33283 (130.01 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


KeyboardInterrupt: 

In [None]:
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, LSTM, TimeDistributed, Dense
from tensorflow.keras.models import Sequential

# Define the RNN model
rnn_model = Sequential([
    TimeDistributed(Conv2D(32, (2, 2), activation='relu'), input_shape=(None, 17, 64)),
    TimeDistributed(MaxPooling2D((1, 1))),  # Adjust pooling here
    TimeDistributed(Conv2D(64, (2, 2), activation='relu')),
    TimeDistributed(MaxPooling2D((1, 1))),  # Adjust pooling here
    TimeDistributed(Flatten()),
    LSTM(128, return_sequences=False),  # Make sure to set return_sequences to False if you're using a Dense layer after LSTM
    Dense(num_classes, activation='softmax')
])

# Compile the model
optimizer = tf.keras.optimizers.Adam()
rnn_model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

# Create a custom callback to stop training after a certain number of steps
class StopAfterNStepsCallback(tf.keras.callbacks.Callback):
    def __init__(self, max_steps):
        self.max_steps = max_steps
        self.current_step = 0

    def on_batch_end(self, batch, logs=None):
        self.current_step += 1
        if self.current_step >= self.max_steps:
            self.model.stop_training = True

# Set the maximum number of steps you want to run
max_steps = 101

# Create the custom callback and pass it to the fit function
stop_callback = StopAfterNStepsCallback(max_steps)

# Train the model
rnn_model.fit(data_gen, epochs=epochs, callbacks=[stop_callback])


ValueError: Exception encountered when calling layer "time_distributed_89" (type TimeDistributed).

Input 0 of layer "conv2d_44" is incompatible with the layer: expected min_ndim=4, found ndim=3. Full shape received: (None, 17, 64)

Call arguments received by layer "time_distributed_89" (type TimeDistributed):
  • inputs=tf.Tensor(shape=(None, None, 17, 64), dtype=float32)
  • training=None
  • mask=None

In [None]:
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, LSTM, TimeDistributed, Dense
from tensorflow.keras.models import Sequential

# Define the RNN model
rnn_model = Sequential([
    Flatten(),  # Add a Flatten layer to reshape the input
    TimeDistributed(Conv2D(32, (2, 2), activation='relu'), input_shape=(None, 51)),
    TimeDistributed(Conv2D(64, (2, 2), activation='relu')),
    TimeDistributed(MaxPooling2D((2, 2))),
    LSTM(128, return_sequences=False),
    Dense(num_classes, activation='softmax')
])
optimizer = tf.keras.optimizers.Adam()
rnn_model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
data_gen = data_generator(data_dir, batch_size)

# Create a custom callback to stop training after a certain number of steps
class StopAfterNStepsCallback(tf.keras.callbacks.Callback):
    def __init__(self, max_steps):
        self.max_steps = max_steps
        self.current_step = 0

    def on_batch_end(self, batch, logs=None):
        self.current_step += 1
        if self.current_step >= self.max_steps:
            self.model.stop_training = True

# Set the maximum number of steps you want to run
max_steps =101

# Create the custom callback and pass it to the fit function
stop_callback = StopAfterNStepsCallback(max_steps)

rnn_model.fit(data_gen, epochs=epochs, callbacks=[stop_callback])

ValueError: Exception encountered when calling layer 'sequential_4' (type Sequential).

`TimeDistributed` Layer should be passed an `input_shape ` with at least 3 dimensions, received: (32, 51)

Call arguments received by layer 'sequential_4' (type Sequential):
  • inputs=tf.Tensor(shape=(32, 17, 3), dtype=float32)
  • training=False
  • mask=None

In [None]:
def data_generator(data_dir, batch_size):
    # Create a dictionary to map class labels to integers
    class_to_int = {class_name: i for i, class_name in enumerate(os.listdir(data_dir))}

    while True:

        # Load and preprocess images
        X_batch = np.zeros((batch_size, *input_shape))
        y_batch = np.zeros(batch_size, dtype=int)
        batch_index = 0
        try:
            y_batch = tf.keras.utils.to_categorical(y_batch, num_classes=num_classes)
        except IndexError:
            print("IndexError occurred with y_batch:", y_batch)
       # X_batch, y_batch = generate_batch(data_dir, batch_size)
        # Iterate over class folders

        for class_dir in os.listdir(data_dir):
            class_path = os.path.join(data_dir, class_dir)
            if os.path.isdir(class_path):
                # Load images and labels for the current class
                for img_file in os.listdir(class_path):
                    img_path = os.path.join(class_path, img_file)
                    img = image.load_img(img_path, target_size=(img_height, img_width))
                    img_array = image.img_to_array(img)

                    # Preprocess image for MoveNet
                    img_tensor = tf.expand_dims(img_array, axis=0)
                    img_tensor = tf.image.resize_with_pad(img_tensor, input_size, input_size)

                    # Run MoveNet
                    keypoints_with_scores = movenet(img_tensor)

                    # Add processed image and label to batch
                    X_batch[batch_index] = keypoints_with_scores[0, 0].reshape(17, 3, 1)
                    y_batch[batch_index] = class_to_int[class_dir]
                    batch_index += 1

            # Break if batch is full
                    if batch_index >= batch_size:

                      break
                        #y_batch = tf.keras.utils.to_categorical(y_batch, num_classes=num_classes)[:, 0]

        y_batch = tf.keras.utils.to_categorical(y_batch, num_classes=num_classes)
        yield (X_batch, y_batch)
        X_batch = []
        y_batch = []



In [None]:
from tensorflow.keras.layers import Reshape
#32, 17, 3
  # Adjust this based on your actual input shape
rnn_model = Sequential([
    TimeDistributed(Conv2D(32, (2, 2), activation='relu'), input_shape=(None,32, 17, 3)),
    TimeDistributed(MaxPooling2D((2, 2))),
    TimeDistributed(Flatten()),
    LSTM(128, return_sequences=False),
    Dense(num_classes, activation='softmax')
])

'''
rnn_model = Sequential([
    Reshape((-1, 17, 3, 1), input_shape=input_shape),
    TimeDistributed(Conv2D(32, (2, 2), activation='relu', padding='same')),
    TimeDistributed(Conv2D(64, (2, 2), activation='relu')),
    TimeDistributed(MaxPooling2D((2, 2))),
    TimeDistributed(Flatten()),
    LSTM(128),
    Dense(num_classes, activation='softmax')
])# ... rest of your model layers
'''
'''])
rnn_model = Sequential([
    TimeDistributed(Conv2D(32, (2, 2), activation='relu', input_shape=(17, 3, 1))),
    TimeDistributed(Conv2D(64, (2, 2), activation='relu')),
    TimeDistributed(MaxPooling2D((2, 2))),
    TimeDistributed(Flatten()),
    LSTM(128),
    Dense(num_classes, activation='softmax')
])'''

"])\nrnn_model = Sequential([\n    TimeDistributed(Conv2D(32, (2, 2), activation='relu', input_shape=(17, 3, 1))),\n    TimeDistributed(Conv2D(64, (2, 2), activation='relu')),\n    TimeDistributed(MaxPooling2D((2, 2))),\n    TimeDistributed(Flatten()),\n    LSTM(128),\n    Dense(num_classes, activation='softmax')\n])"

In [None]:
optimizer = tf.keras.optimizers.Adam()
rnn_model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
data_gen = data_generator(data_dir, batch_size)

# Create a custom callback to stop training after a certain number of steps
class StopAfterNStepsCallback(tf.keras.callbacks.Callback):
    def __init__(self, max_steps):
        self.max_steps = max_steps
        self.current_step = 0

    def on_batch_end(self, batch, logs=None):
        self.current_step += 1
        if self.current_step >= self.max_steps:
            self.model.stop_training = True

# Set the maximum number of steps you want to run
max_steps =101

# Create the custom callback and pass it to the fit function
stop_callback = StopAfterNStepsCallback(max_steps)

rnn_model.fit(data_gen, epochs=epochs, callbacks=[stop_callback])

KeyboardInterrupt: 

In [None]:

# Create LSTM model
lstm_model = Sequential([
    TimeDistributed(Conv2D(32, (2, 2), activation='relu', input_shape=(32, 17, 3,))),
    TimeDistributed(Conv2D(64, (2, 2), activation='relu')),
    TimeDistributed(MaxPooling2D((2, 2))),
    TimeDistributed(Flatten()),
    LSTM(128),
    Dense(num_classes, activation='softmax')
])

optimizer = tf.keras.optimizers.Adam()
lstm_model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
data_gen = data_generator(data_dir, batch_size)

# Create a custom callback to stop training after a certain number of steps
class StopAfterNStepsCallback(tf.keras.callbacks.Callback):
    def __init__(self, max_steps):
        self.max_steps = max_steps
        self.current_step = 0

    def on_batch_end(self, batch, logs=None):
        self.current_step += 1
        if self.current_step >= self.max_steps:
            self.model.stop_training = True

# Set the maximum number of steps you want to run
max_steps =101

# Create the custom callback and pass it to the fit function
stop_callback = StopAfterNStepsCallback(max_steps)
lstm_model.fit(data_gen, epochs=epochs, callbacks=[stop_callback])

KeyboardInterrupt: 

In [None]:

# Compile and train models
optimizer = tf.keras.optimizers.Adam()
#cnn_model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
rnn_model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
lstm_model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

data_gen = data_generator(data_dir, batch_size)
#cnn_model.fit(data_gen, epochs=epochs)
rnn_model.fit(data_gen, epochs=epochs)
lstm_model.fit(data_gen, epochs=epochs)


ValueError: in user code:

    File "/usr/local/lib/python3.10/dist-packages/keras/src/engine/training.py", line 1401, in train_function  *
        return step_function(self, iterator)
    File "/usr/local/lib/python3.10/dist-packages/keras/src/engine/training.py", line 1384, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "/usr/local/lib/python3.10/dist-packages/keras/src/engine/training.py", line 1373, in run_step  **
        outputs = model.train_step(data)
    File "/usr/local/lib/python3.10/dist-packages/keras/src/engine/training.py", line 1150, in train_step
        y_pred = self(x, training=True)
    File "/usr/local/lib/python3.10/dist-packages/keras/src/utils/traceback_utils.py", line 70, in error_handler
        raise e.with_traceback(filtered_tb) from None
    File "/usr/local/lib/python3.10/dist-packages/keras/src/engine/input_spec.py", line 253, in assert_input_compatibility
        raise ValueError(

    ValueError: Exception encountered when calling layer 'time_distributed' (type TimeDistributed).
    
    Input 0 of layer "conv2d" is incompatible with the layer: expected min_ndim=4, found ndim=2. Full shape received: (None, None)
    
    Call arguments received by layer 'time_distributed' (type TimeDistributed):
      • inputs=tf.Tensor(shape=(None, None, None), dtype=float32)
      • training=True
      • mask=None


In [None]:
# Data generator function
def data_generator(data_dir, batch_size):
    while True:
        # Load and preprocess images
        X_batch = []
        y_batch = []

        # Iterate over class folders
        for class_dir in os.listdir(data_dir):
            class_path = os.path.join(data_dir, class_dir)
            if os.path.isdir(class_path):
                # Load images and labels for the current class
                class_images = []
                class_labels = []
                for img_file in os.listdir(class_path):
                    img_path = os.path.join(class_path, img_file)
                    img = image.load_img(img_path, target_size=(img_height, img_width))
                    img_array = image.img_to_array(img)

                    # Preprocess image for MoveNet
                    img_tensor = tf.expand_dims(img_array, axis=0)
                    img_tensor = tf.image.resize_with_pad(img_tensor, input_size, input_size)

                    # Run MoveNet
                    keypoints_with_scores = movenet(img_tensor)

                    # Add processed image to class list
                    class_images.append(keypoints_with_scores)
                    class_labels.append(class_dir)

                # Add class data to batch
                X_batch.append(np.array(class_images))
                y_batch.extend(class_labels)

                # Break if batch is full
                if len(y_batch) >= batch_size:
                    X_batch = np.array(X_batch)
                    y_batch = np.array(y_batch[:batch_size])
                    yield (X_batch, y_batch)
                    X_batch = []
                    y_batch = []



In [None]:
# Create RNN model
rnn_model = Sequential([
    TimeDistributed(Conv2D(32, (3, 3), activation='relu', input_shape=(None, 17, 3, 1))),
    TimeDistributed(Conv2D(64, (3, 3), activation='relu')),
    TimeDistributed(MaxPooling2D((2, 2))),
    TimeDistributed(Flatten()),
    LSTM(128),
    Dense(num_classes, activation='softmax')
])

# Create LSTM model
lstm_model = Sequential([
    TimeDistributed(Conv2D(32, (3, 3), activation='relu', input_shape=(None, 17, 3, 1))),
    TimeDistributed(Conv2D(64, (3, 3), activation='relu')),
    TimeDistributed(MaxPooling2D((2, 2))),
    TimeDistributed(Flatten()),
    LSTM(128),
    Dense(num_classes, activation='softmax')
])