Import knihoven

In [2]:
!pip install -q mediapy remotezip
import collections
import cv2
import imageio
import numpy as np
import pathlib
import random
import tensorflow as tf
import tqdm
import remotezip as rz
from IPython.display import HTML


[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.6 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m50.9 MB/s[0m eta [36m0:00:00[0m
[?25h

In [3]:
from google.colab import drive
import zipfile
import os

# Připojení Google Disku
drive.mount('/content/drive')

# Cesta k ZIP souboru na Google Disku
# Příklad: /content/drive/MyDrive/moje_slozka/muj_soubor.zip
zip_soubor = "/content/drive/MyDrive/FAVka/3.ročník/dataset_clips.zip"
cilova_slozka = "Dataset_clips"

# Vytvoření cílové složky (pokud neexistuje)
os.makedirs(cilova_slozka, exist_ok=True)

# Rozbalení ZIP souboru
with zipfile.ZipFile(zip_soubor, 'r') as zip_ref:
    zip_ref.extractall(cilova_slozka)

print(f"✅ Soubor {zip_soubor} byl rozbalen do složky: {cilova_slozka}")


Mounted at /content/drive
✅ Soubor /content/drive/MyDrive/FAVka/3.ročník/dataset_clips.zip byl rozbalen do složky: Dataset_clips


Předpřipravení dat

In [4]:
import os
import glob
import cv2
import numpy as np
import tensorflow as tf
import random
from pathlib import Path

# Nastavení cesty k datasetu
DATASET_PATH = Path("Dataset_clips")

# Počet snímků na klip (doporučeno 16 pro 3D CNN)
FRAMES_PER_CLIP = 16
IMG_SIZE = (112, 112)  # Velikost snímků

# 📂 **Funkce pro získání tříd z datasetu**
def get_class_names(dataset_path):
    return sorted([d.name for d in dataset_path.glob("train/*") if d.is_dir()])

# 📂 **Funkce pro načtení videa a jeho rozdělení na snímky**
def load_video_frames(video_path, n_frames=FRAMES_PER_CLIP, frame_step=2):
    cap = cv2.VideoCapture(str(video_path))
    frames = []
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Vybereme začátek videa náhodně, pokud je video delší než potřebujeme
    start_frame = random.randint(0, max(0, total_frames - (n_frames * frame_step)))

    cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)

    for _ in range(n_frames):
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, IMG_SIZE)  # Změníme velikost
        frames.append(frame)

        # Přeskočíme několik snímků (frame_step)
        for _ in range(frame_step - 1):
            cap.read()

    cap.release()

    # Pokud má video méně snímků, doplníme černými snímky
    while len(frames) < n_frames:
        frames.append(np.zeros((IMG_SIZE[0], IMG_SIZE[1], 3), dtype=np.uint8))

    return np.array(frames)

# 📂 **Funkce pro načtení všech videí do datasetu**
def load_dataset(dataset_path, subset="train"):
    videos = []
    labels = []
    class_names = get_class_names(dataset_path)

    for class_idx, class_name in enumerate(class_names):
        video_files = list((dataset_path / subset / class_name).glob("*.mp4"))  # Uprav na formát souborů

        for video_path in video_files:
            frames = load_video_frames(video_path)
            videos.append(frames)
            labels.append(class_idx)

    return np.array(videos), np.array(labels), class_names

# 📂 **Načtení dat**
x_train, y_train, class_names = load_dataset(DATASET_PATH, "train")
x_val, y_val, _ = load_dataset(DATASET_PATH, "val")
x_test, y_test, _ = load_dataset(DATASET_PATH, "test")

# 🔹 Výpis informací o datasetu
print(f"✅ Načteno: {len(x_train)} trénovacích videí, {len(x_val)} validačních videí, {len(x_test)} testovacích videí")
print(f"✅ Počet tříd: {len(class_names)}, třídy: {class_names}")



✅ Načteno: 292 trénovacích videí, 34 validačních videí, 44 testovacích videí
✅ Počet tříd: 7, třídy: ['cut_valid', 'knot_preparation', 'knot_valid', 'needle_extraction', 'needle_on_skin', 'pierce', 'pull']


Vytvoření modelu

In [7]:
import tensorflow as tf
from tensorflow.keras import layers, models, regularizers

def conv_bn_relu(x, filters, kernel_size=3, strides=1):
    x = layers.Conv3D(filters, kernel_size, strides=strides, padding="same",
                      kernel_regularizer=regularizers.l2(1e-4))(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)
    return x

def residual_block(x, filters):
    shortcut = x
    x = conv_bn_relu(x, filters)
    x = conv_bn_relu(x, filters)

    # pokud shortcut má jiný shape, přizpůsobíme
    if shortcut.shape != x.shape:
        shortcut = layers.Conv3D(filters, kernel_size=1, padding="same")(shortcut)

    x = layers.Add()([x, shortcut])
    x = layers.Activation("relu")(x)
    return x

def build_lite_res3dnet(input_shape=(16, 112, 112, 3), num_classes=7):
    inputs = tf.keras.Input(shape=input_shape)

    x = conv_bn_relu(inputs, 32)
    x = residual_block(x, 32)
    x = layers.MaxPooling3D(pool_size=(1, 2, 2))(x)

    x = residual_block(x, 64)
    x = layers.MaxPooling3D(pool_size=(2, 2, 2))(x)

    x = residual_block(x, 128)
    x = layers.MaxPooling3D(pool_size=(2, 2, 2))(x)

    x = residual_block(x, 256)
    x = layers.GlobalAveragePooling3D()(x)
    x = layers.Dense(256, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(num_classes, activation='softmax')(x)

    model = models.Model(inputs, outputs)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(1e-4),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

model = build_lite_res3dnet()
model.summary()

Trénování sítě

In [None]:
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint

# Callbacks
early_stop = EarlyStopping(
    monitor='val_loss',
    patience=10,
    restore_best_weights=True
)

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=3,
    verbose=1
)

# Volitelný: ukládání nejlepšího modelu
checkpoint = ModelCheckpoint(
    "best_model.h5",
    monitor="val_accuracy",
    save_best_only=True,
    verbose=1
)

# 🚀 Trénování modelu
history = model.fit(
    x_train, y_train,
    validation_data=(x_val, y_val),
    epochs=10,
    batch_size=10,
    callbacks=[early_stop, reduce_lr, checkpoint]
)


Vizualizace průběhu trénování

In [None]:
import matplotlib.pyplot as plt

# 📊 **Vizualizace průběhu trénování**
plt.figure(figsize=(12, 5))

# 🎯 Přesnost modelu
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label="Trénovací přesnost")
plt.plot(history.history['val_accuracy'], label="Validace")
plt.xlabel("Epochy")
plt.ylabel("Přesnost")
plt.legend()
plt.title("Vývoj přesnosti modelu")

# 🎯 Chyba (Loss)
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label="Trénovací chyba")
plt.plot(history.history['val_loss'], label="Validace")
plt.xlabel("Epochy")
plt.ylabel("Loss")
plt.legend()
plt.title("Vývoj chyby modelu")

plt.show()


Vytvoření klasifikačního reportu a Confusion Matrix

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report

# 📌 **Vyhodnocení na testovací sadě**
test_loss, test_acc = model.evaluate(x_test, y_test)
print(f"✅ Přesnost na testovací sadě: {test_acc:.2f}")

# 🔍 **Predikce na celé testovací sadě**
y_pred_probs = model.predict(x_test)  # Pravděpodobnosti tříd
y_pred = np.argmax(y_pred_probs, axis=1)  # Převedení na predikované třídy

y_true = y_test  # Pokud y_test již obsahuje číselné štítky tříd

# 📊 **Výpočet Confusion Matrix**
cm = confusion_matrix(y_true, y_pred)

# 📌 **Vykreslení Confusion Matrix**
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
plt.xlabel("Predikované třídy")
plt.ylabel("Skutečné třídy")
plt.title("Confusion Matrix")
plt.show()

# 📝 **Podrobný report přesnosti modelu**
print("\n🔍 Klasifikační report:")
print(classification_report(y_true, y_pred, target_names=class_names))


Graf predikce pro video

In [None]:
import cv2
import imageio
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from IPython.display import HTML

import matplotlib as mpl
mpl.rcParams['animation.embed_limit'] = 400  # Zvýší limit na 100 MB


# 📂 **Funkce pro načtení videa, rozdělení na snímky a vytvoření GIFu**
def load_video_for_prediction(video_path, n_frames=16, step=8):
    cap = cv2.VideoCapture(str(video_path))
    frames = []
    fps = cap.get(cv2.CAP_PROP_FPS)  # Načteme FPS videa

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, (112, 112))  # Rescale
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  # Převod na RGB
        frames.append(frame)

    cap.release()

    frames = np.array(frames)

    # 🖼 **Vytvoření GIFu se správným FPS**
    gif_path = "video_preview.gif"
    imageio.mimsave(gif_path, frames, fps=int(fps))  # Použijeme FPS z videa

    # 🔹 **Rozdělení na úseky (16 snímků s posunem 8)**
    clips = []
    for i in range(0, len(frames) - n_frames, step):
        clip = frames[i:i+n_frames]
        clips.append(clip)

    return np.array(clips), frames, gif_path, fps

# 📂 **Funkce pro predikci a vizualizaci**
def predict_video(video_path, model, class_names):
    clips, frames, gif_path, fps = load_video_for_prediction(video_path)
    predictions = model.predict(clips)  # Průběžná predikce
    print("Predikce: ", predictions.shape)

    # 🔹 **Časové kroky synchronizované s FPS**
    time_steps = np.linspace(0, len(frames) / fps, len(frames))
    pred_time_steps = np.linspace(0, len(frames) / fps, len(predictions))  # Časové kroky pro predikce

    # 🛠 **Oprava: Interpolace predikcí na stejný počet snímků**
    interpolated_predictions = np.zeros((len(frames), predictions.shape[1]))  # Správná délka predikcí
    for i in range(predictions.shape[1]):  # Pro každou třídu provedeme interpolaci
        interpolated_predictions[:, i] = np.interp(time_steps, pred_time_steps, predictions[:, i])

    # 🖼 **Vytvoření animované vizualizace**
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

    # 🎥 **Video GIF v Matplotlibu**
    img = ax1.imshow(frames[0])
    ax1.axis("off")
    ax1.set_title("Video")

    # 📊 **Graf predikcí**
    ax2.set_ylim(0, 1)
    ax2.set_xlim(0, max(time_steps))
    ax2.set_xlabel("Čas (s)")
    ax2.set_ylabel("Pravděpodobnost")
    ax2.set_title("Průběžná predikce")

    lines = {}
    for i, class_name in enumerate(class_names):
        line, = ax2.plot([], [], label=class_name, linewidth=2)
        lines[class_name] = line

    ax2.legend()

    # 🔹 **Funkce pro animaci**
    def update(frame_idx):
        if frame_idx < len(frames):
            img.set_array(frames[frame_idx])  # Aktualizace snímku videa
        for i, class_name in enumerate(class_names):
            lines[class_name].set_data(time_steps[:frame_idx], interpolated_predictions[:frame_idx, i])  # Aktualizace grafu
        return [img] + list(lines.values())

    # 🛠 **Opravený interval pro synchronizaci**
    interval = 1000 / fps  # Každý snímek odpovídá reálnému FPS
    ani = animation.FuncAnimation(fig, update, frames=len(frames), interval=interval, blit=False, repeat=False)

    # 📂 **Použití HTML pro animaci**
    plt.close(fig)
    return HTML(ani.to_jshtml())

# 🔹 **Spuštění vizualizace**
video_path = "/content/video.mp4"
predict_video(video_path, model, class_names)
