In [2]:
import pandas as pd
import numpy as np

from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential, load_model
from sklearn.model_selection import StratifiedKFold
from tensorflow.keras.layers import LSTM, Dense, Masking

from sklearn.model_selection import KFold

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
body_parts = [
            'mouth', 'eye', 'skull', 'upper tail bone', 'lower tail bone',
            'upper tail', 'lower tail', 'pectoral fin', 'anal fin start',
            'anal fin mid', 'dorsal fin_base', 'dorsal fin_tip', 'stomach', 'middle'
        ]

In [4]:
import numpy as np
import pandas as pd

def prepare_individuals(data_numeric, target_length=141, body_parts=None, num_individuals=8):
    if body_parts is None:
        body_parts = [
            'mouth', 'eye', 'skull', 'upper tail bone', 'lower tail bone',
            'upper tail', 'lower tail', 'pectoral fin', 'anal fin start',
            'anal fin mid', 'dorsal fin_base', 'dorsal fin_tip', 'stomach', 'middle'
        ]

    def process_column(column, target_length):
        result_array = np.zeros(target_length)
        non_nan_indices = np.where(~column.isna())[0]
        if len(non_nan_indices) > 1:
            valid_values = column[non_nan_indices]
            differences = np.diff(valid_values)
            for i, diff in enumerate(differences):
                result_array[non_nan_indices[i + 1]] = diff
        return result_array

    individual_features = {}

    for individual in range(1, num_individuals + 1):
        features_list = []

        for idx, body_part in enumerate(body_parts):
            if individual == 1 and idx == 0:
                x_col_name = 'x'
                y_col_name = 'y'
            else:
                x_col_name = f'x.{(individual - 1) * len(body_parts) + idx}'
                y_col_name = f'y.{(individual - 1) * len(body_parts) + idx}'

            if x_col_name in data_numeric.columns and y_col_name in data_numeric.columns:
                delta_x = process_column(data_numeric[x_col_name], target_length)
                delta_y = process_column(data_numeric[y_col_name], target_length)

                if len(delta_x) > 0 and len(delta_y) > 0:
                    speed = np.insert(np.sqrt(delta_x**2 + delta_y**2), 0, 0)
                    direction = np.insert(np.arctan2(delta_y, delta_x), 0, 0)
                    direction_degrees = np.degrees(direction)

                    features_list.append(speed)
                    features_list.append(direction_degrees)

        if features_list:
            individual_features[f'individual{individual}'] = pd.DataFrame(features_list).transpose()

    return individual_features

Train the model on data with 7 individual fishes, each having 14 keypoints for 141 images and 3 different species. The video used is from the FishTrac dataset 'V1_Leleiwi_26June19_17.mp4'. 

In [5]:
# data_numeric = pd.read_csv('/content/drive/MyDrive/CollectedData_jaime.csv', skiprows=3)
data_numeric = pd.read_csv('TrainDataFishVideo.csv', skiprows=3)
print(data_numeric.shape)

data = prepare_individuals(data_numeric, target_length=141, body_parts=body_parts, num_individuals=8)

if 'individual8' in data:
    del data['individual8']

for key in data.keys():
    print(f"{key}: {len(data[key].columns)} columns")

(141, 227)
individual1: 28 columns
individual2: 28 columns
individual3: 28 columns
individual4: 28 columns
individual5: 28 columns
individual6: 28 columns
individual7: 28 columns


In [6]:
# Asignar las etiquetas a los individuos
jaime_labels = [0, 0, 2, 2, 1, 2, 1]

# Oversample class 1 and 0
Since we have only 8 individual fish examples for training for three different species then it is important that our data is balanced.

In [16]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.utils import to_categorical
import numpy as np
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import RandomOverSampler
from sklearn.utils import shuffle

# Asignar las etiquetas a los individuos
jaime_labels = [0, 0, 2, 2, 1, 2, 1]

# Asumiendo que jaime_data es un diccionario de DataFrames y jaime_labels ya está definido
all_data = []
all_labels = []

for key, df in data.items():
    # Convertir el DataFrame a un array 3D (samples, time steps, features)
    individual_data = np.expand_dims(df.values, axis=0)
    all_data.append(individual_data)
    all_labels.append(jaime_labels[len(all_data)-1])  # Asegúrate de que jaime_labels esté en el orden correcto

# Convertir listas a arrays de NumPy
all_data = np.concatenate(all_data, axis=0)
all_labels = to_categorical(all_labels, num_classes=3)  # Convertir etiquetas a categóricas
# X_resampled and y_resampled are now the resampled feature set and labels, respectively

# Dividir los datos y las etiquetas en conjuntos de entrenamiento y validación (80% - 20%)
train_data, validation_data, train_labels, validation_labels = train_test_split(
    all_data, all_labels, test_size=0.2, random_state=42)
# Flatten the time steps and features into a single dimension
# Flatten the time series data into 2D
nsamples, nx, ny = train_data.shape
train_data_2d = train_data.reshape((nsamples, nx*ny))

# Perform oversampling on 2D data
ros = RandomOverSampler(random_state=0)
X_resampled_2d, y_resampled = ros.fit_resample(train_data_2d, train_labels)

# Reshape the data back to 3D
X_resampled = X_resampled_2d.reshape((-1, nx, ny))

# Shuffle the dataset to ensure random distribution after resampling (optional but recommended)
X_resampled, y_resampled = shuffle(X_resampled, y_resampled, random_state=0)

y_resampled_encoded = to_categorical(y_resampled, num_classes=3)
# Now, `X_resampled` and `y_resampled` have a balanced class distribution
# You can then proceed to train your LSTM model with this resampled data
validation_labels_encoded = to_categorical(validation_labels, num_classes=3)


model1 = Sequential([
    LSTM(50, input_shape=(train_data.shape[1], train_data.shape[2])),  # 50 unidades LSTM
    Dense(3, activation='softmax')  # Capa de salida para 3 clases
])
model1.add(Masking(mask_value=0., input_shape=(141, 56)))  # Assuming each body part has 2 features: speed and direction
model1.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

history = model1.fit(X_resampled, y_resampled_encoded, epochs=10, validation_data=(validation_data, validation_labels))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


Testing the model on another video from the fishTrac dataset. This video contains 10 individuals and 57 frames. The video is '02_Oct_18_Vid-3.mp4'

In [22]:
# katia_data_numeric = pd.read_csv('/content/drive/MyDrive/CollectedData_katia.csv', skiprows=3)
test_data_numeric = pd.read_csv('TrainDataFishVideo.csv', skiprows=3)

print(test_data_numeric.shape)

test_data = prepare_individuals(test_data_numeric, target_length=56, body_parts=body_parts, num_individuals=10)

if 'individual11' in test_data:
    del test_data['individual11']

for key in test_data.keys():
    print(f"{key}: {len(test_data[key].columns)} columns")

(56, 311)
individual1: 28 columns
individual2: 28 columns
individual3: 28 columns
individual4: 28 columns
individual5: 28 columns
individual6: 28 columns
individual7: 28 columns
individual8: 28 columns
individual9: 28 columns
individual10: 28 columns


In [None]:
from tensorflow.keras.preprocessing.sequence import pad_sequences

# Prepare test data with padding
test_data_padded = []

for key, df in test_data.items():
    # Convert the DataFrame to a 3D array (samples, time steps, features)
    individual_data = np.expand_dims(df.values, axis=0)
    # Pad sequences to match the model's expected input shape (142 time steps)
    individual_data_padded = pad_sequences(individual_data, maxlen=142, dtype='float32', padding='post', truncating='post', value=0.0)
    test_data_padded.append(individual_data_padded)

# Convert list to a NumPy array
test_data_padded = np.concatenate(test_data_padded, axis=0)

# Make predictions with padded test data
predictions = model1.predict(test_data_padded)
print(predictions)
# Optionally, convert predictions to labels
predicted_labels = np.argmax(predictions, axis=1)
predicted_labels

In [27]:
from sklearn.metrics import accuracy_score

# Assuming you have a variable `true_labels` which contains the true class indices
true_labels = np.array([0, 1, 0, 2, 1, 3, 3,1,1,3])


# Now you have the predicted class indices, you can compare them with the true labels
accuracy = accuracy_score(true_labels, predicted_labels)

# Print out the accuracy
print(f"Model accuracy: {accuracy * 100:.2f}%")


Model accuracy: 40.00%


In [None]:
true_labels = [0, 1, 0, 2, 1, 3, 3,1,1]