In [8]:
import os
import cv2
import numpy as np
import datetime
import tensorflow as tf
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Dense, Dropout, Flatten, Lambda, Concatenate, GlobalAveragePooling2D, GlobalMaxPooling2D
from tensorflow.keras.regularizers import l2
from tensorflow.keras.callbacks import ModelCheckpoint, TensorBoard 
import random


In [12]:

def extract_label(folder_name):
    """Extracts label from folder name (third part after splitting by '_')"""
    return int(folder_name.split("_")[2])

def create_txt_list(data_dir, output_txt):
    """Creates dataset description file"""
    with open(output_txt, "w") as f:
        for folder in os.listdir(data_dir):
            label = extract_label(folder)
            f.write(f"{folder} 1 {label}\n")
    print(f"Created dataset file with {len(os.listdir(data_dir))} samples")

def generator_train_batch(train_file, batch_size, num_classes, img_path):
    """Generator that yields batches of video data"""
    with open(train_file, "r") as f:
        lines = f.readlines()

    while True:
        random.shuffle(lines)
        for i in range(0, len(lines), batch_size):
            batch_lines = lines[i:i+batch_size]
            X_batch = []
            y_batch = []

            for line in batch_lines:
                try:
                    parts = line.strip().split()
                    if len(parts) < 3:
                        continue

                    # Normalize path separators and construct full path
                    relative_path = parts[0].replace("/", os.sep).replace("\\", os.sep)
                    folder_path = os.path.join(img_path, relative_path)
                    
                    # Verify path exists
                    if not os.path.exists(folder_path):
                        print(f"Path not found, skipping: {folder_path}")
                        continue

                    # Load frames
                    frame_files = sorted([f for f in os.listdir(folder_path) 
                                        if f.lower().endswith(('.png', '.jpg', '.jpeg'))])
                    
                    if len(frame_files) < 10:
                        print(f"Not enough frames in: {folder_path}")
                        continue

                    frames = []
                    for frame_file in frame_files[:10]:  # Take first 10 frames
                        img_path_full = os.path.join(folder_path, frame_file)
                        img = cv2.imread(img_path_full)
                        
                        if img is None:
                            print(f"Failed to read image: {img_path_full}")
                            continue
                            
                        img = cv2.resize(img, (112, 112))
                        img = img / 255.0  # Normalize
                        frames.append(img)

                    if len(frames) < 10:
                        continue  # Skip if any frames failed loading

                    # Average frames (for pure 2D CNN approach)
                    avg_frame = np.mean(frames, axis=0)
                    X_batch.append(avg_frame)
                    y_batch.append(int(parts[2]))

                except Exception as e:
                    print(f"Error processing line '{line.strip()}': {str(e)}")
                    continue

            if len(X_batch) == 0:
                continue  # Skip empty batches

            yield np.array(X_batch), to_categorical(y_batch, num_classes=num_classes)

def build_2dcnn_model(num_classes):
    """Builds 2D CNN model with global pooling and concatenation"""
    input_shape = (112, 112, 3)  # Single frame input
    
    inputs = Input(input_shape)
    
    # CNN backbone with new structure
    x = Conv2D(64, (3, 3), activation='relu', padding='same')(inputs)
    x = MaxPooling2D((2, 2))(x)
    
    x = Conv2D(128, (3, 3), activation='relu', padding='same')(x)
    x = MaxPooling2D((2, 2))(x)
    
    x = Conv2D(256, (3, 3), activation='relu', padding='same')(x)
    x = MaxPooling2D((2, 2))(x)
    
    # Global pooling and concatenation
    x_avg = GlobalAveragePooling2D()(x)
    x_max = GlobalMaxPooling2D()(x)
    features = Concatenate()([x_avg, x_max])
    
    # Classification layers
    x = Dense(512, activation='relu')(features)
    x = Dropout(0.5)(x)
    outputs = Dense(num_classes, activation='softmax')(x)
    
    model = Model(inputs, outputs)
    return model

class TimeHistory(tf.keras.callbacks.Callback):
    """Callback to track training time"""
    def on_epoch_begin(self, epoch, logs=None):
        self.start_time = datetime.datetime.now()
    def on_epoch_end(self, epoch, logs=None):
        duration = (datetime.datetime.now() - self.start_time).total_seconds()
        print(f"Epoch {epoch+1} duration: {duration:.2f}s")

def main():
    # Configuration
    DATA_DIR = r"D:\DATA\data-videos\data"
    TRAIN_TXT = "train_list.txt"
    NUM_CLASSES = 4
    BATCH_SIZE = 8
    EPOCHS = 25

    # Create dataset description file
    create_txt_list(DATA_DIR, TRAIN_TXT)

    # Build model
    model = build_2dcnn_model(NUM_CLASSES)
    model.compile(loss='categorical_crossentropy',
                  optimizer='adam',
                  metrics=['accuracy'])
    model.summary()

    # Callbacks
    callbacks = [
        ModelCheckpoint("2dcnn_weights.h5", save_best_only=True),
        TensorBoard(log_dir="logs"),
        TimeHistory()
    ]

    # Train model
    history = model.fit(
        generator_train_batch(TRAIN_TXT, BATCH_SIZE, NUM_CLASSES, DATA_DIR),
        steps_per_epoch=122,  # Adjust based on dataset size
        epochs=EPOCHS,
        callbacks=callbacks
    )

    # Save final model
    model.save("2dcnn_video_classifier.h5")
    print("Training complete. Model saved.") 


In [13]:

if __name__ == "__main__":
    main()

Created dataset file with 6570 samples
Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_3 (InputLayer)           [(None, 112, 112, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv2d_4 (Conv2D)              (None, 112, 112, 64  1792        ['input_3[0][0]']                
                                )                                                                 
                                                                                                  
 max_pooling2d_3 (MaxPooling2D)  (None, 56, 56, 64)  0           ['conv2d_4[0][0]']               
                                                     

In [14]:
# Test class extraction
data_dir = r"D:\DATA\data-videos\data"

# Collect all labels
all_labels = []
invalid_folders = []

for folder in os.listdir(data_dir):
    try:
        label = extract_label(folder)
        all_labels.append(label)
    except (ValueError, IndexError) as e:
        invalid_folders.append(folder)
        continue

# Analyze results
unique_labels = sorted(list(set(all_labels)))
print(f"Number of classes extracted: {len(unique_labels)}")
print(f"Unique labels: {unique_labels}")

if len(invalid_folders) > 0:
    print(f"\nWarning: {len(invalid_folders)} invalid folders:")
    print("Sample invalid folders:", invalid_folders[:5])

print("\nSample valid folder-to-label mapping:")
for folder in os.listdir(data_dir)[:5]:  # Show first 5 valid mappings
    try:
        print(f"{folder} -> {extract_label(folder)}")
    except:
        continue

Number of classes extracted: 4
Unique labels: [0, 1, 2, 3]

Sample valid folder-to-label mapping:
B_0_0_seq0 -> 0
B_0_0_seq1 -> 0
B_0_0_seq10 -> 0
B_0_0_seq11 -> 0
B_0_0_seq12 -> 0
