In [None]:
import os
import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
from sklearn.model_selection import train_test_split
import cv2
from tqdm import tqdm
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns


In [None]:
tf.random.set_seed(42)
np.random.seed(42)

In [None]:
DATA_PATH = 'path/to/your/private/dataset'  
CLASSES = ['lenggang', 'joged', 'hitammanis', 'kembangpayung', 'langkahjepen']
NUM_CLASSES = len(CLASSES)
FRAME_SIZE = 224
NUM_FRAMES = 30
BATCH_SIZE = 8
EPOCHS = 50
LEARNING_RATE = 0.0001
MODEL_PATH = './final6_i3d_model.h5'

In [None]:
I3D_MODEL_URL = "https://tfhub.dev/deepmind/i3d-kinetics-400/1"

In [None]:
def load_video_frames(video_path, num_frames=NUM_FRAMES, frame_size=FRAME_SIZE):
    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    frame_step = max(total_frames // num_frames, 1)
    frames = []
    frame_count = 0

    while len(frames) < num_frames and cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        if frame_count % frame_step == 0:
            frame = cv2.resize(frame, (frame_size, frame_size))
            frame = frame / 255.0
            frames.append(frame)

            if len(frames) == num_frames:
                break

        frame_count += 1

    cap.release()

    while len(frames) < num_frames:
        frames.append(np.zeros((frame_size, frame_size, 3)))  

    return np.array(frames, dtype=np.float32)

In [None]:
def create_dataset():
    video_paths = []
    labels = []

    for class_idx, class_name in enumerate(CLASSES):
        for i in range(1, 201):  
            video_name = f"{class_name}_{i}.mp4"
            video_path = os.path.join(DATA_PATH, video_name)
            
            if os.path.exists(video_path):
                video_paths.append(video_path)
                labels.append(class_idx)
            else:
                print(f"Warning: Missing file {video_path}")

    train_paths, val_paths, train_labels, val_labels = train_test_split(
        video_paths, labels, test_size=0.2, random_state=42, stratify=labels)

    return (train_paths, train_labels), (val_paths, val_labels)

(train_paths, train_labels), (val_paths, val_labels) = create_dataset()
print(f"Training samples: {len(train_paths)}")
print(f"Validation samples: {len(val_paths)}")

In [None]:
class VideoDataGenerator(tf.keras.utils.Sequence):
    def __init__(self, video_paths, labels, batch_size=BATCH_SIZE, num_frames=NUM_FRAMES, shuffle=True):
        self.video_paths = video_paths
        self.labels = labels
        self.batch_size = batch_size
        self.num_frames = num_frames
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        return int(np.ceil(len(self.video_paths) / self.batch_size))

    def __getitem__(self, index):
        batch_paths = self.video_paths[index*self.batch_size:(index+1)*self.batch_size]
        batch_labels = self.labels[index*self.batch_size:(index+1)*self.batch_size]

        batch_frames = []
        for path in batch_paths:
            frames = load_video_frames(path, self.num_frames)
            batch_frames.append(frames)

        return np.array(batch_frames), tf.keras.utils.to_categorical(batch_labels, num_classes=NUM_CLASSES)

    def on_epoch_end(self):
        if self.shuffle:
            indices = np.arange(len(self.video_paths))
            np.random.shuffle(indices)
            self.video_paths = [self.video_paths[i] for i in indices]
            self.labels = [self.labels[i] for i in indices]

train_generator = VideoDataGenerator(train_paths, train_labels)
val_generator = VideoDataGenerator(val_paths, val_labels, shuffle=False)


In [None]:
class I3DWrapper(tf.keras.layers.Layer):
    def __init__(self, hub_url, **kwargs):
        super(I3DWrapper, self).__init__(**kwargs)
        self.hub_url = hub_url
        self.i3d = None

    def build(self, input_shape):
        # Load the I3D model
        self.i3d = hub.load(self.hub_url).signatures['default']
        # Initialize with a dummy input
        dummy_input = tf.zeros((1, NUM_FRAMES, FRAME_SIZE, FRAME_SIZE, 3))
        _ = self.i3d(dummy_input)

    def call(self, inputs):
        inputs = tf.cast(inputs, tf.float32)
        outputs = self.i3d(inputs)
        # Return the features (will be 400 for kinetics-400 model)
        return outputs['default']

    def compute_output_shape(self, input_shape):
        return (input_shape[0], 400)


In [None]:
def build_model():
    inputs = tf.keras.Input(shape=(NUM_FRAMES, FRAME_SIZE, FRAME_SIZE, 3))

    # Use our wrapper layer - outputs (None, 400)
    x = I3DWrapper(I3D_MODEL_URL)(inputs)

    # Classification head
    x = tf.keras.layers.Dense(256, activation='relu')(x)
    x = tf.keras.layers.Dropout(0.5)(x)
    predictions = tf.keras.layers.Dense(NUM_CLASSES, activation='softmax')(x)

    model = tf.keras.Model(inputs=inputs, outputs=predictions)

    model.compile(
        optimizer=tf.keras.optimizers.Adam(LEARNING_RATE),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    return model


In [None]:
model = build_model()
model.summary()

In [None]:
callbacks = [
    tf.keras.callbacks.EarlyStopping(
        monitor='val_accuracy',
        patience=10,
        restore_best_weights=True
    ),
    tf.keras.callbacks.ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.1,
        patience=3
    ),
    tf.keras.callbacks.ModelCheckpoint(
        filepath=MODEL_PATH,
        save_best_only=True,
        monitor='val_accuracy'
    )
]


In [None]:

print("Starting training...")
history = model.fit(
    train_generator,
    validation_data=val_generator,
    epochs=EPOCHS,
    callbacks=callbacks,
    verbose=1
)


In [None]:
# Plot training history
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
def evaluate_model(model, generator, data_name='Dataset'):
    y_true = []
    y_pred = []

    for i in tqdm(range(len(generator)), desc=f"Evaluating {data_name}"):
        x, y = generator[i]
        batch_pred = model.predict(x, verbose=0)
        y_true.extend(np.argmax(y, axis=1))
        y_pred.extend(np.argmax(batch_pred, axis=1))

    print(f"\n{data_name} Classification Report:")
    print(classification_report(y_true, y_pred, target_names=CLASSES, digits=4))

    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=CLASSES, yticklabels=CLASSES)
    plt.title(f'{data_name} Confusion Matrix')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.show()

In [None]:
print("Evaluating Validation Set...")
evaluate_model(model, val_generator, 'Validation')

In [None]:
model.save(MODEL_PATH)
print(f"Model saved to {MODEL_PATH}")