<a href="https://colab.research.google.com/github/DilemmaFixer3/AI_pr_5-6-7/blob/main/pr10.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Розпакування архіву
!unzip -q пр_10.zip
print("Архів пр_10.zip успішно розпаковано. Створені папки: Train, Test, Meta.")

In [None]:
import os
import numpy as np
import time
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.applications import ResNet50, MobileNetV2
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, accuracy_score
from skimage.util import random_noise
import cv2
from tqdm.notebook import tqdm

# --- Глобальні Параметри (Переконайтесь, що ці значення правильні для вашої підмножини!) ---
DATA_DIR_TRAIN = './Train'  # Шлях до розпакованої папки Train
DATA_DIR_TEST = './Test'    # Шлях до розпакованої папки Test
IMAGE_SIZE = (224, 224)
BATCH_SIZE = 32
# Якщо ви використовуєте підмножину з 5 класів, змініть NUM_CLASSES на 5
NUM_CLASSES = 5
VALIDATION_SPLIT = 0.2  # 20% від Train піде на Val

# Відключення попереджень
tf.get_logger().setLevel('ERROR')

In [None]:
# --- Функції Препроцесингу ---
from tensorflow.keras.applications.resnet50 import preprocess_input as resnet_preprocess
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input as mobilenet_preprocess

data_augmentation = tf.keras.Sequential([
    tf.keras.layers.RandomFlip("horizontal"),
    tf.keras.layers.RandomRotation(0.1),
    tf.keras.layers.RandomZoom(0.1),
])

def preprocess_dataset(ds, model_type='resnet'):
    if model_type == 'resnet':
        preprocess_fn = resnet_preprocess
    else:
        preprocess_fn = mobilenet_preprocess

    # Застосовуємо аугментацію лише до навчальних даних
    if model_type == 'train':
         ds = ds.map(lambda x, y: (data_augmentation(x, training=True), y))

    # Застосовуємо специфічну попередню обробку
    return ds.map(lambda x, y: (preprocess_fn(x), y)).cache().prefetch(buffer_size=tf.data.AUTOTUNE)


# --- 1. Завантаження та Розділення Навчального Набору ---
print("Завантаження та розділення Train на Train/Val...")

# Завантажуємо весь навчальний набір
full_train_ds = tf.keras.utils.image_dataset_from_directory(
    DATA_DIR_TRAIN,
    labels='inferred',
    label_mode='categorical',
    image_size=IMAGE_SIZE,
    batch_size=BATCH_SIZE,
    shuffle=True
)

CLASS_NAMES = full_train_ds.class_names
print(f"Знайдено класів: {len(CLASS_NAMES)}")

# Розділення на Train та Val
val_size = int(len(full_train_ds) * VALIDATION_SPLIT)
train_size = len(full_train_ds) - val_size

train_ds = full_train_ds.take(train_size)
val_ds = full_train_ds.skip(train_size).take(val_size)

# --- 2. Завантаження Тестового Набору ---
test_ds_raw = tf.keras.utils.image_dataset_from_directory(
    DATA_DIR_TEST,
    labels='inferred',
    label_mode='categorical',
    image_size=IMAGE_SIZE,
    batch_size=BATCH_SIZE,
    shuffle=False
)

# --- 3. Фінальні Набори Даних з Препроцесингом ---
train_ds_res = preprocess_dataset(train_ds, 'resnet')
val_ds_res = preprocess_dataset(val_ds, 'resnet')
test_ds_res = preprocess_dataset(test_ds_raw, 'resnet')

train_ds_mob = preprocess_dataset(train_ds, 'mobilenet')
val_ds_mob = preprocess_dataset(val_ds, 'mobilenet')
test_ds_mob = preprocess_dataset(test_ds_raw, 'mobilenet')

print("Підготовка даних завершена.")

In [None]:
# --- Функція 1: Створення Моделі ---
def create_model(base_model_class, input_shape, num_classes):
    base_model = base_model_class(
        weights='imagenet',
        include_top=False,
        input_shape=input_shape
    )
    base_model.trainable = False

    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(256, activation='relu')(x)
    predictions = Dense(num_classes, activation='softmax')(x)

    model = Model(inputs=base_model.input, outputs=predictions)

    print(f"Створено {base_model_class.__name__}. Параметрів: {model.count_params() / 1e6:.2f} млн")
    return model, base_model

# --- Функція 2: Навчання та Fine-tuning ---
def run_training_experiment(model, base_model, ds_train, ds_val, ds_test, fine_tune_layers, model_name):
    print(f"\n--- Експеримент {model_name} (Fine-tuning {fine_tune_layers} шарів) ---")

    # 1. Початкове навчання (лише класифікатор)
    print("Навчання класифікатора...")
    model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy'])
    start_time_init = time.time()
    hist_init = model.fit(ds_train, epochs=5, validation_data=ds_val, verbose=1)

    # 2. Fine-tuning
    print("Розморожування та Fine-tuning...")
    base_model.trainable = True
    for layer in base_model.layers[:-fine_tune_layers]:
        layer.trainable = False

    model.compile(optimizer=Adam(learning_rate=1e-5), loss='categorical_crossentropy', metrics=['accuracy'])
    start_time_ft = time.time()
    hist_ft = model.fit(ds_train, epochs=10, validation_data=ds_val, verbose=1)

    total_time = (time.time() - start_time_init)

    # 3. Збереження та Оцінка
    model.save(f'{model_name}_best.h5')
    loss, acc = model.evaluate(ds_test, verbose=0)
    print(f"\n{model_name} Фінальна точність: {acc:.4f} | Час: {total_time:.2f} сек")

    return model, hist_init, hist_ft, total_time, acc


# --- ВИКОНАННЯ ЕКСПЕРИМЕНТІВ ---
models = {}
results = {}

# 1. ResNet-50
resnet_model, resnet_base = create_model(ResNet50, IMAGE_SIZE + (3,), NUM_CLASSES)
models['ResNet50'], hist_res_init, hist_res_ft, time_res, acc_res = run_training_experiment(
    resnet_model, resnet_base, train_ds_res, val_ds_res, test_ds_res,
    fine_tune_layers=20, model_name='ResNet50'
)
results['ResNet50'] = {'time': time_res, 'accuracy': acc_res, 'params': resnet_model.count_params(), 'history': {'init': hist_res_init, 'ft': hist_res_ft}}


# 2. MobileNetV2
mobilenet_model, mobilenet_base = create_model(MobileNetV2, IMAGE_SIZE + (3,), NUM_CLASSES)
models['MobileNetV2'], hist_mob_init, hist_mob_ft, time_mob, acc_mob = run_training_experiment(
    mobilenet_model, mobilenet_base, train_ds_mob, val_ds_mob, test_ds_mob,
    fine_tune_layers=10, model_name='MobileNetV2'
)
results['MobileNetV2'] = {'time': time_mob, 'accuracy': acc_mob, 'params': mobilenet_model.count_params(), 'history': {'init': hist_mob_init, 'ft': hist_mob_ft}}

In [None]:
# --- 1. Ансамбль (Середнє Агрегування) ---
print("\n--- Ансамбль Моделей ---")

# Отримання ймовірностей
preds_res = models['ResNet50'].predict(test_ds_res, verbose=0)
preds_mob = models['MobileNetV2'].predict(test_ds_mob, verbose=0)

# Істинні мітки
y_true = np.concatenate([y for x, y in test_ds_raw]).argmax(axis=1)

# Середнє агрегування
preds_avg = (preds_res + preds_mob) / 2
ensemble_predictions = np.argmax(preds_avg, axis=1)
acc_ensemble = accuracy_score(y_true, ensemble_predictions)

print(f"Точність Ансамблю (Середнє): {acc_ensemble:.4f}")

# --- 2. Побудова Графіків Навчання та CM (для ResNet) ---
def plot_training_history(history_init, history_ft, model_name):
    plt.figure(figsize=(12, 4))

    # Об'єднання історій
    full_loss = history_init.history['loss'] + history_ft.history['loss']
    full_val_loss = history_init.history['val_loss'] + history_ft.history['val_loss']

    plt.subplot(1, 2, 1)
    plt.plot(full_loss, label='Train Loss')
    plt.plot(full_val_loss, label='Validation Loss')
    plt.title(f'{model_name} Loss (Fine-tuning)')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

# plot_training_history(results['ResNet50']['history']['init'], results['ResNet50']['history']['ft'], 'ResNet50')
# plot_training_history(results['MobileNetV2']['history']['init'], results['MobileNetV2']['history']['ft'], 'MobileNetV2')


# Матриця Змішування (для ResNet)
y_pred_res = np.argmax(preds_res, axis=1)
cm = confusion_matrix(y_true, y_pred_res)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=CLASS_NAMES)
fig, ax = plt.subplots(figsize=(10, 10))
disp.plot(cmap=plt.cm.Blues, ax=ax)
plt.title('Confusion Matrix for ResNet-50')
plt.show()

In [None]:
# --- Квантизація MobileNetV2 ---
mobilenet_model_ft = models['MobileNetV2']
print("\n--- Квантизація MobileNetV2 ---")
converter = tf.lite.TFLiteConverter.from_keras_model(mobilenet_model_ft)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_quant_model = converter.convert()

quant_file_path = 'mobilenet_quantized.tflite'
with open(quant_file_path, 'wb') as f:
    f.write(tflite_quant_model)

# Порівняння розмірів
original_size = os.path.getsize('MobileNetV2_best.h5') / (1024 * 1024)
quant_size = os.path.getsize(quant_file_path) / (1024 * 1024)
print(f"Розмір Оригінал: {original_size:.2f} MB | Квантований: {quant_size:.2f} MB")

In [None]:
# --- Функції Створення Шуму/Дефектів ---
def add_gaussian_noise(image_tensor, var=0.01):
    img_np = image_tensor.numpy() / 255.0
    sigma = var**0.5
    noisy_img = img_np + np.random.normal(0, sigma, img_np.shape)
    return tf.convert_to_tensor(np.clip(noisy_img * 255.0, 0, 255).astype(np.float32))

def add_salt_and_pepper_noise(image_tensor, amount=0.01):
    img_np = image_tensor.numpy() / 255.0
    noisy_img = random_noise(img_np, mode='s&p', amount=amount)
    return tf.convert_to_tensor(np.clip(noisy_img * 255.0, 0, 255).astype(np.float32))

def apply_affine_shear(image_tensor, shear_factor=0.1):
    img_np = image_tensor.numpy().astype(np.uint8)
    h, w = IMAGE_SIZE

    # Матриця зсуву (Shear)
    M = np.float32([[1, shear_factor, 0], [0, 1, 0]])
    M[0, 2] = -shear_factor * h / 2
    M[1, 2] = 0

    sheared_img = cv2.warpAffine(img_np, M, (w, h))
    return tf.convert_to_tensor(sheared_img.astype(np.float32))

# --- Функція для Створення Тестового Набору з Дефектами ---
def create_defective_ds(ds_original, defect_fn, preprocess_fn):
    return ds_original.unbatch().map(
        lambda x, y: (defect_fn(x), y),
        num_parallel_calls=tf.data.AUTOTUNE
    ).batch(BATCH_SIZE).map(
        lambda x, y: (preprocess_fn(x), y)
    )

if resnet_model_ft and mobilenet_model_ft:

    # Створення тестових наборів з дефектами (для ResNet-50)
    test_gauss_res = create_defective_ds(test_ds, add_gaussian_noise, resnet_preprocess)
    test_snp_res = create_defective_ds(test_ds, add_salt_and_pepper_noise, resnet_preprocess)
    test_affine_res = create_defective_ds(test_ds, apply_affine_shear, resnet_preprocess)

    # Створення тестових наборів з дефектами (для MobileNetV2)
    test_gauss_mob = create_defective_ds(test_ds, add_gaussian_noise, mobilenet_preprocess)
    test_snp_mob = create_defective_ds(test_ds, add_salt_and_pepper_noise, mobilenet_preprocess)
    test_affine_mob = create_defective_ds(test_ds, apply_affine_shear, mobilenet_preprocess)

    print("\n--- Дослідження Стійкості ---")

    metrics = {
        'ResNet-50': resnet_model_ft,
        'MobileNetV2': mobilenet_model_ft
    }

    results = {}

    for name, model in metrics.items():
        preprocess_fn = resnet_preprocess if name == 'ResNet-50' else mobilenet_preprocess

        # Використовуємо відповідні набори даних
        test_sets = {
            'Original': test_ds_res if name == 'ResNet-50' else test_ds_mob,
            'Gaussian Noise': test_gauss_res if name == 'ResNet-50' else test_gauss_mob,
            'Salt-and-Pepper': test_snp_res if name == 'ResNet-50' else test_snp_mob,
            'Affine Shear': test_affine_res if name == 'ResNet-50' else test_affine_mob,
        }

        results[name] = {}
        print(f"\nТестування {name}:")

        for defect_type, ds in test_sets.items():
            loss, acc = model.evaluate(ds, verbose=0)
            results[name][defect_type] = acc
            print(f"  {defect_type}: {acc:.4f}")

    # Фінальне порівняння стійкості
    original_acc_res = results['ResNet-50']['Original']
    original_acc_mob = results['MobileNetV2']['Original']

    print("\nПорівняння Падіння Точності ($\Delta Accuracy$):")
    print("Тип Дефекту | ResNet-50 ($\Delta$) | MobileNetV2 ($\Delta$)")
    print("---|---|---")

    for defect in list(test_sets.keys())[1:]:
        delta_res = original_acc_res - results['ResNet-50'][defect]
        delta_mob = original_acc_mob - results['MobileNetV2'][defect]
        print(f"{defect: <15} | {delta_res:.4f} | {delta_mob:.4f}")

In [None]:
pip install tensorflow

Collecting tensorflow
  Downloading tensorflow-2.20.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.5 kB)
Collecting astunparse>=1.6.0 (from tensorflow)
  Downloading astunparse-1.6.3-py2.py3-none-any.whl.metadata (4.4 kB)
Collecting flatbuffers>=24.3.25 (from tensorflow)
  Downloading flatbuffers-25.9.23-py2.py3-none-any.whl.metadata (875 bytes)
Collecting google_pasta>=0.1.1 (from tensorflow)
  Downloading google_pasta-0.2.0-py3-none-any.whl.metadata (814 bytes)
Collecting libclang>=13.0.0 (from tensorflow)
  Downloading libclang-18.1.1-py2.py3-none-manylinux2010_x86_64.whl.metadata (5.2 kB)
Collecting tensorboard~=2.20.0 (from tensorflow)
  Downloading tensorboard-2.20.0-py3-none-any.whl.metadata (1.8 kB)
Collecting wheel<1.0,>=0.23.0 (from astunparse>=1.6.0->tensorflow)
  Downloading wheel-0.45.1-py3-none-any.whl.metadata (2.3 kB)
Collecting tensorboard-data-server<0.8.0,>=0.7.0 (from tensorboard~=2.20.0->tensorflow)
  Downloading tensorboard_data_server-0.

In [None]:
pip install opencv-python scikit-image -q

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.0/67.0 MB[0m [31m21.1 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
# --- КРОК 1: Ініціалізація та Завантаження ---

import os
import numpy as np
import time
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.applications import ResNet50, MobileNetV2
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, accuracy_score
from skimage.util import random_noise
import cv2
from tqdm.notebook import tqdm

# ВСТАВТЕ ВАШ АРХІВ СЮДИ: пр_10.zip
# Розпакування архіву
!unzip -q пр_10.zip
print("Архів пр_10.zip успішно розпаковано. Дані готові.")

# --- Глобальні Параметри ---н
DATA_DIR_TRAIN = './Train'
DATA_DIR_TEST = './Test'
IMAGE_SIZE = (224, 224)
BATCH_SIZE = 32
# Змініть це, якщо ви працюєте з підмножиною!
NUM_CLASSES = 5
VALIDATION_SPLIT = 0.2
tf.get_logger().setLevel('ERROR')

unzip:  cannot find or open пр_10.zip, пр_10.zip.zip or пр_10.zip.ZIP.
Архів пр_10.zip успішно розпаковано. Дані готові.


In [None]:
import pandas as pd
import shutil
import os

print("--- Початок реорганізації GTSRB (Виправлення шляху) ---")

REORG_TRAIN = './Train_reorg'
REORG_TEST = './Test_reorg'
os.makedirs(REORG_TRAIN, exist_ok=True)
os.makedirs(REORG_TEST, exist_ok=True)
class_info = {}

# --- 2. Реорганізація НАВЧАЛЬНОГО набору ---
print("Реорганізація навчального набору (Train)...")
try:
    train_labels = pd.read_csv('Train.csv')

    for index, row in tqdm(train_labels.iterrows(), total=len(train_labels), desc="Обробка Train"):
        class_id = str(row['ClassId']).zfill(2)
        class_path = os.path.join(REORG_TRAIN, class_id)
        os.makedirs(class_path, exist_ok=True)

        # --- ВИПРАВЛЕННЯ ШЛЯХУ ---
        # 1. Беремо шлях з CSV
        original_relative_path = row['Path']
        # 2. Видаляємо дублюючий префікс 'Train/'
        if original_relative_path.startswith('Train/'):
            original_relative_path = original_relative_path[len('Train/'):]

        # 3. Формуємо правильний вихідний шлях
        source_path = os.path.join('Train', original_relative_path)
        destination_path = os.path.join(class_path, os.path.basename(row['Path']))

        shutil.move(source_path, destination_path)
        class_info[row['ClassId']] = class_info.get(row['ClassId'], 0) + 1

except FileNotFoundError:
    print("ПОМИЛКА: Не знайдено файл Train.csv або папки Train. Перевірте архів.")
    raise

# --- 3. Реорганізація ТЕСТОВОГО набору ---
print("Реорганізація тестового набору (Test)...")
try:
    test_labels = pd.read_csv('Test.csv')

    for index, row in tqdm(test_labels.iterrows(), total=len(test_labels), desc="Обробка Test"):
        class_id = str(row['ClassId']).zfill(2)
        class_path = os.path.join(REORG_TEST, class_id)
        os.makedirs(class_path, exist_ok=True)

        # --- ВИПРАВЛЕННЯ ШЛЯХУ ---
        original_relative_path = row['Path']
        # Тут шлях може починатися з 'Test/' (хоча в GTSRB частіше відсутній)
        if original_relative_path.startswith('Test/'):
            original_relative_path = original_relative_path[len('Test/'):]

        source_path = os.path.join('Test', original_relative_path)
        destination_path = os.path.join(class_path, os.path.basename(row['Path']))

        shutil.move(source_path, destination_path)

except FileNotFoundError:
    print("ПОМИЛКА: Не знайдено файл Test.csv або папки Test. Перевірте архів.")
    raise

print("--- Реорганізація завершена! Тепер можна запускати Крок 2. ---")

# Обов'язково ПЕРЕЗАПУСТІТЬ ДРУГУ КЛІТИНКУ КОДУ,
# переконавшись, що в ній DATA_DIR_TRAIN = './Train_reorg' та DATA_DIR_TEST = './Test_reorg'.

--- Початок реорганізації GTSRB (Виправлення шляху) ---
Реорганізація навчального набору (Train)...
ПОМИЛКА: Не знайдено файл Train.csv або папки Train. Перевірте архів.


FileNotFoundError: [Errno 2] No such file or directory: 'Train.csv'

In [None]:
import shutil

print("\n--- Фільтрування даних: Збереження лише 5 класів ---")

# --- 1. Визначення класів для збереження ---
# ОБЕРІТЬ ВАШІ 5 КЛАСІВ (формат 'XX' як назва папки)
# Приклад: Speed limit 20 (0), 30 (1), 50 (2), Stop (14), Yield (13)
CLASSES_TO_KEEP = ['00', '01', '02', '13', '14']
# Переконайтеся, що ви обрали ті 5, які вам потрібні!

REORG_TRAIN = './Train_reorg'
REORG_TEST = './Test_reorg'

# --- 2. Фільтрування та видалення зайвих класів ---

for data_dir in [REORG_TRAIN, REORG_TEST]:
    print(f"Обробка каталогу: {data_dir}")

    # Отримуємо список усіх папок класів у каталозі
    all_class_dirs = [d for d in os.listdir(data_dir)
                      if os.path.isdir(os.path.join(data_dir, d))]

    # Визначаємо папки, які потрібно видалити
    dirs_to_remove = [d for d in all_class_dirs if d not in CLASSES_TO_KEEP]

    # Видалення зайвих папок
    for class_id in dirs_to_remove:
        path_to_remove = os.path.join(data_dir, class_id)
        shutil.rmtree(path_to_remove)

    print(f"  Видалено {len(dirs_to_remove)} папок класів. Залишилося {len(CLASSES_TO_KEEP)}.")

# --- 3. Оновлення Глобальних Параметрів ---

# ОНОВІТЬ ГЛОБАЛЬНІ ЗМІННІ ПЕРЕД ЗАПУСКОМ КРОКУ 2
# (Ці зміни потрібно внести в початковий блок КРОК 2)
# DATA_DIR_TRAIN = './Train_reorg' # Шляхи вже встановлені
# DATA_DIR_TEST = './Test_reorg'
NUM_CLASSES = len(CLASSES_TO_KEEP) # Тепер це 5

print(f"\n✅ Фільтрування завершено. Встановлено NUM_CLASSES = {NUM_CLASSES}.")
print("Тепер можна безпечно запускати блок КРОК 2.")


--- Фільтрування даних: Збереження лише 5 класів ---
Обробка каталогу: ./Train_reorg
  Видалено 38 папок класів. Залишилося 5.
Обробка каталогу: ./Test_reorg
  Видалено 38 папок класів. Залишилося 5.

✅ Фільтрування завершено. Встановлено NUM_CLASSES = 5.
Тепер можна безпечно запускати блок КРОК 2.


In [None]:
# --- Глобальні Параметри ---
DATA_DIR_TRAIN = './Train_reorg'
DATA_DIR_TEST = './Test_reorg'
IMAGE_SIZE = (224, 224)
BATCH_SIZE = 32
# Змініть це, якщо ви працюєте з підмножиною!
NUM_CLASSES = 5
VALIDATION_SPLIT = 0.2
tf.get_logger().setLevel('ERROR')
# --- КРОК 2: ПІДГОТОВКА ДАНИХ ТА АУГМЕНТАЦІЯ ---

from tensorflow.keras.applications.resnet50 import preprocess_input as resnet_preprocess
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input as mobilenet_preprocess

data_augmentation = tf.keras.Sequential([
    tf.keras.layers.RandomFlip("horizontal"),
    tf.keras.layers.RandomRotation(0.1),
    tf.keras.layers.RandomZoom(0.1),
])

def preprocess_dataset(ds, model_type='resnet'):
    if model_type == 'resnet':
        preprocess_fn = resnet_preprocess
    elif model_type == 'mobilenet':
        preprocess_fn = mobilenet_preprocess
    else:
        # Для сирого тестового набору
        preprocess_fn = lambda x: x

    if model_type == 'train':
         ds = ds.map(lambda x, y: (data_augmentation(x, training=True), y))

    return ds.map(lambda x, y: (preprocess_fn(x), y)).cache().prefetch(tf.data.AUTOTUNE)

# Завантаження та Розділення Train/Val
full_train_ds = tf.keras.utils.image_dataset_from_directory(
    DATA_DIR_TRAIN, labels='inferred', label_mode='categorical',
    image_size=IMAGE_SIZE, batch_size=BATCH_SIZE, shuffle=True
)
test_ds_raw = tf.keras.utils.image_dataset_from_directory(
    DATA_DIR_TEST, labels='inferred', label_mode='categorical',
    image_size=IMAGE_SIZE, batch_size=BATCH_SIZE, shuffle=False
)

CLASS_NAMES = full_train_ds.class_names
val_size = int(len(full_train_ds) * VALIDATION_SPLIT)
train_ds = full_train_ds.take(len(full_train_ds) - val_size)
val_ds = full_train_ds.skip(len(full_train_ds) - val_size).take(val_size)

# Фінальні набори даних
train_ds_res = preprocess_dataset(train_ds, 'resnet')
val_ds_res = preprocess_dataset(val_ds, 'resnet')
test_ds_res = preprocess_dataset(test_ds_raw, 'resnet')
train_ds_mob = preprocess_dataset(train_ds, 'mobilenet')  # <<< СТВОРЕННЯ train_ds_mob
val_ds_mob = preprocess_dataset(val_ds, 'mobilenet')
test_ds_mob = preprocess_dataset(test_ds_raw, 'mobilenet')

print(f"\nПідготовка даних завершена. Використовуємо {len(CLASS_NAMES)} класів.")


# --- ФУНКЦІЇ ДЛЯ МОДЕЛЕЙ ---

def create_model(base_model_class, input_shape, num_classes):
    base_model = base_model_class(weights='imagenet', include_top=False, input_shape=input_shape)
    base_model.trainable = False
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    predictions = Dense(num_classes, activation='softmax')(x)
    model = Model(inputs=base_model.input, outputs=predictions)
    return model, base_model

def run_fine_tuning(model, base_model, ds_train, ds_val, ds_test, layers_to_unfreeze, model_name):
    print(f"\n--- Fine-tuning {model_name} (Шари: {layers_to_unfreeze}) ---")

    # 1. Початкове навчання (5 епох)
    model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy'])
    start_time_init = time.time()
    hist_init = model.fit(ds_train, epochs=5, validation_data=ds_val, verbose=0)

    # 2. Fine-tuning (10 епох)
    base_model.trainable = True
    if layers_to_unfreeze > 0:
        for layer in base_model.layers[:-layers_to_unfreeze]:
            layer.trainable = False

    model.compile(optimizer=Adam(learning_rate=1e-5), loss='categorical_crossentropy', metrics=['accuracy'])
    hist_ft = model.fit(ds_train, epochs=10, validation_data=ds_val, verbose=0)

    total_time = (time.time() - start_time_init)

    # Оцінка
    loss, acc = model.evaluate(ds_test, verbose=0)

    return model, acc, total_time, hist_init.history, hist_ft.history

# --- ВИКОНАННЯ ВСІХ ЕКСПЕРИМЕНТІВ FINE-TUNING ---

FINAL_RESULTS = {}
BEST_MODELS = {}

# Експеримент 1: ResNet-50 (Fine-tuning 20 шарів)
resnet_model, resnet_base = create_model(ResNet50, IMAGE_SIZE + (3,), NUM_CLASSES)
resnet_model_ft, acc_res, time_res, hist_res_init, hist_res_ft = run_fine_tuning(
    resnet_model,
    resnet_base,
    train_ds_res,
    val_ds_res,
    test_ds_res,  # <<< ПЕРЕДАЄМО ds_test_res
    20,
    'ResNet50'      # <<< ПЕРЕДАЄМО 'ResNet50'
)
FINAL_RESULTS['ResNet50_20'] = {'acc': acc_res, 'time': time_res, 'params': resnet_model.count_params(), 'history': {'init': hist_res_init, 'ft': hist_res_ft}}
BEST_MODELS['ResNet50'] = resnet_model_ft
resnet_model_ft.save('ResNet50_best.h5')
print(f"ResNet50 (20 шарів) | Точність: {acc_res:.4f} | Час: {time_res:.2f} сек")
print("\n--- ЗАВЕРШЕНО ResNet. ПОЧАТОК MobileNetV2 ---") # <<<< ДОДАЙТЕ ЦЕЙ РЯДОК




Found 7620 files belonging to 5 classes.
Found 2520 files belonging to 5 classes.

Підготовка даних завершена. Використовуємо 5 класів.

--- Fine-tuning ResNet50 (Шари: 20) ---


In [None]:
# --- ПІДГОТОВКА СЕРЕДОВИЩА ТА ІМПОРТИ ---
import os
import numpy as np
import time
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.applications import ResNet50, MobileNetV2
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, accuracy_score
from skimage.util import random_noise
import cv2
from tqdm.notebook import tqdm
import gc # <<< ІМПОРТ ДЛЯ ЗБИРАННЯ СМІТТЯ

# --- ГЛОБАЛЬНІ ПАРАМЕТРИ (ВСТАНОВЛЕНІ ПІСЛЯ РЕОРГАНІЗАЦІЇ) ---
DATA_DIR_TRAIN = './Train_reorg'
DATA_DIR_TEST = './Test_reorg'
IMAGE_SIZE = (224, 224)
BATCH_SIZE = 16 # <<< ЗМЕНШЕНО ДЛЯ ЕКОНОМІЇ ПАМ'ЯТІ
NUM_CLASSES = 5
VALIDATION_SPLIT = 0.2
tf.get_logger().setLevel('ERROR')

# --- ФУНКЦІЇ ПІДГОТОВКИ ДАНИХ ТА ПРЕПРОЦЕСИНГУ ---

from tensorflow.keras.applications.resnet50 import preprocess_input as resnet_preprocess
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input as mobilenet_preprocess

data_augmentation = tf.keras.Sequential([
    tf.keras.layers.RandomFlip("horizontal"),
    tf.keras.layers.RandomRotation(0.1),
    tf.keras.layers.RandomZoom(0.1),
])

def preprocess_dataset(ds, model_type='resnet'):
    if model_type == 'resnet': preprocess_fn = resnet_preprocess
    elif model_type == 'mobilenet': preprocess_fn = mobilenet_preprocess
    else: preprocess_fn = lambda x: x
    if model_type == 'train': ds = ds.map(lambda x, y: (data_augmentation(x, training=True), y))
    # Використовуємо .unbatch() та .batch() для уникнення проблем з попереднім кешуванням великого розміру пакета
    return ds.map(lambda x, y: (preprocess_fn(x), y)).unbatch().batch(BATCH_SIZE).cache().prefetch(tf.data.AUTOTUNE)

# --- ЗАВАНТАЖЕННЯ ТА РОЗДІЛЕННЯ ДАНИХ ---

try:
    full_train_ds = tf.keras.utils.image_dataset_from_directory(
        DATA_DIR_TRAIN, labels='inferred', label_mode='categorical',
        image_size=IMAGE_SIZE, batch_size=BATCH_SIZE, shuffle=True
    )
    test_ds_raw = tf.keras.utils.image_dataset_from_directory(
        DATA_DIR_TEST, labels='inferred', label_mode='categorical',
        image_size=IMAGE_SIZE, batch_size=BATCH_SIZE, shuffle=False
    )
except Exception as e:
    print(f"Помилка завантаження даних. Перевірте шляхи: {e}")
    raise e

CLASS_NAMES = full_train_ds.class_names
val_size = int(len(full_train_ds) * VALIDATION_SPLIT)
train_ds = full_train_ds.take(len(full_train_ds) - val_size)
val_ds = full_train_ds.skip(len(full_train_ds) - val_size).take(val_size)

# Створюємо набори даних для ResNet
train_ds_res = preprocess_dataset(train_ds, 'resnet')
val_ds_res = preprocess_dataset(val_ds, 'resnet')
test_ds_res = preprocess_dataset(test_ds_raw, 'resnet')

# Створюємо набори даних для MobileNet (ці змінні поки залишаються в пам'яті, але будуть використані пізніше)
train_ds_mob = preprocess_dataset(train_ds, 'mobilenet')
val_ds_mob = preprocess_dataset(val_ds, 'mobilenet')
test_ds_mob = preprocess_dataset(test_ds_raw, 'mobilenet')

print(f"\nПідготовка даних завершена. Використовуємо {len(CLASS_NAMES)} класів.")

# --- ФУНКЦІЇ МОДЕЛЕЙ ТА FINE-TUNING ---

def create_model(base_model_class, input_shape, num_classes):
    base_model = base_model_class(weights='imagenet', include_top=False, input_shape=input_shape)
    base_model.trainable = False
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    predictions = Dense(num_classes, activation='softmax')(x)
    model = Model(inputs=base_model.input, outputs=predictions)
    return model, base_model

def run_fine_tuning(model, base_model, ds_train, ds_val, ds_test, layers_to_unfreeze, model_name):
    print(f"\n--- Fine-tuning {model_name} (Шари: {layers_to_unfreeze}) ---")

    # 1. Початкове навчання класифікатора (3 епохи)
    model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy'])
    start_time_init = time.time()
    hist_init = model.fit(ds_train, epochs=3, validation_data=ds_val, verbose=0)

    # 2. Fine-tuning (5 епох)
    base_model.trainable = True
    if layers_to_unfreeze > 0:
        for layer in base_model.layers[:-layers_to_unfreeze]:
            layer.trainable = False

    model.compile(optimizer=Adam(learning_rate=1e-5), loss='categorical_crossentropy', metrics=['accuracy'])
    hist_ft = model.fit(ds_train, epochs=5, validation_data=ds_val, verbose=0)

    total_time = (time.time() - start_time_init)
    loss, acc = model.evaluate(ds_test, verbose=0)

    return model, acc, total_time, hist_init.history, hist_ft.history

# --- ВИКОНАННЯ ЕКСПЕРИМЕНТІВ ---

FINAL_RESULTS = {}
BEST_MODELS = {}

# 1. ResNet-50
resnet_model, resnet_base = create_model(ResNet50, IMAGE_SIZE + (3,), NUM_CLASSES)
resnet_model_ft, acc_res, time_res, hist_res_init, hist_res_ft = run_fine_tuning(
    resnet_model, resnet_base, train_ds_res, val_ds_res, test_ds_res, 20, 'ResNet50'
)
FINAL_RESULTS['ResNet50_20'] = {'acc': acc_res, 'time': time_res, 'params': resnet_model.count_params(), 'history': {'init': hist_res_init, 'ft': hist_res_ft}}
BEST_MODELS['ResNet50'] = resnet_model_ft
resnet_model_ft.save('ResNet50_best.h5')
print(f"ResNet50 (20 шарів) | Точність: {acc_res:.4f} | Час: {time_res:.2f} сек")


# ----------------------------------------------------------------------
# <<< СЕКЦІЯ ЗВІЛЬНЕННЯ ПАМ'ЯТІ ПЕРЕД НАВЧАННЯМ MOBILE NET >>>
# ----------------------------------------------------------------------

print("\n!!! Звільняємо пам'ять ResNet50 перед MobileNetV2, щоб уникнути збою сесії !!!")

# Видаляємо ResNet моделі та набори даних ResNet-типу
del resnet_model
del resnet_base
del resnet_model_ft
del train_ds_res
del val_ds_res
del test_ds_res

# Примусове очищення пам'яті
tf.keras.backend.clear_session()
gc.collect()

print("Пам'ять ResNet50 звільнена. Перехід до MobileNetV2.")


# 2. MobileNetV2 (Модель буде створена зараз, використовуючи звільнену пам'ять)
mobilenet_model, mobilenet_base = create_model(MobileNetV2, IMAGE_SIZE + (3,), NUM_CLASSES)
mobilenet_model_ft, acc_mob, time_mob, hist_mob_init, hist_mob_ft = run_fine_tuning(
    mobilenet_model, mobilenet_base, train_ds_mob, val_ds_mob, test_ds_mob, 10, 'MobileNetV2'
)
FINAL_RESULTS['MobileNetV2_10'] = {'acc': acc_mob, 'time': time_mob, 'params': mobilenet_model.count_params(), 'history': {'init': hist_mob_init, 'ft': hist_mob_ft}}
BEST_MODELS['MobileNetV2'] = mobilenet_model_ft
mobilenet_model_ft.save('MobileNetV2_best.h5')
print(f"MobileNetV2 (10 шарів) | Точність: {acc_mob:.4f} | Час: {time_mob:.2f} сек")


# --- АНАЛІЗ: ENSEMBLE МОДЕЛЕЙ ---

print("\n--- АНАЛІЗ: ENSEMBLE МОДЕЛЕЙ ---")
# Оскільки ми видалили test_ds_res, нам потрібно тимчасово відновити його для прогнозу ResNet
test_ds_res = preprocess_dataset(test_ds_raw, 'resnet')
y_true = np.concatenate([y for x, y in test_ds_raw.unbatch().batch(BATCH_SIZE)]).argmax(axis=1)

# Завантажуємо ResNet50 назад, оскільки ми його видалили
BEST_MODELS['ResNet50'] = tf.keras.models.load_model('ResNet50_best.h5')

preds_res = BEST_MODELS['ResNet50'].predict(test_ds_res, verbose=0)
preds_mob = BEST_MODELS['MobileNetV2'].predict(test_ds_mob, verbose=0)

# Середнє Агрегування
preds_avg = (preds_res + preds_mob) / 2
acc_ensemble = accuracy_score(y_true, np.argmax(preds_avg, axis=1))

# Зважене Середнє
w_res = FINAL_RESULTS['ResNet50_20']['acc']
w_mob = FINAL_RESULTS['MobileNetV2_10']['acc']
total_w = w_res + w_mob
preds_weighted = (w_res / total_w * preds_res) + (w_mob / total_w * preds_mob)
acc_weighted = accuracy_score(y_true, np.argmax(preds_weighted, axis=1))

print(f"Ensemble (Середнє) Точність: {acc_ensemble:.4f}")
print(f"Ensemble (Зважене) Точність: {acc_weighted:.4f}")
FINAL_RESULTS['Ensemble_Avg'] = {'acc': acc_ensemble}

# Звільнення пам'яті для подальших кроків
del BEST_MODELS['ResNet50']
del test_ds_res
tf.keras.backend.clear_session()
gc.collect()

# --- АНАЛІЗ: КВАНТИЗАЦІЯ (MobileNetV2) ---

def measure_inference_speed(model, ds_test, model_name, interpreter=None):
    images_to_test = 500
    # Намагаємося уникнути використання .unbatch() для великих наборів, але для порівняння швидкості це потрібно
    test_data_iter = ds_test.unbatch().take(images_to_test)

    if interpreter:
        input_details = interpreter.get_input_details()
        input_index = input_details[0]['index']
        start_time = time.time()
        for x, _ in tqdm(test_data_iter, total=images_to_test, desc=f"Inf {model_name}"):
            x_np = x.numpy()[None, ...].astype(input_details[0]['dtype'])
            interpreter.set_tensor(input_index, x_np)
            interpreter.invoke()
    else:
        start_time = time.time()
        for x, _ in tqdm(test_data_iter, total=images_to_test, desc=f"Inf {model_name}"):
            model.predict(x[None, ...], verbose=0)

    return time.time() - start_time

print("\n--- АНАЛІЗ: КВАНТИЗАЦІЯ MOBILE NetV2 ---")

converter = tf.lite.TFLiteConverter.from_keras_model(BEST_MODELS['MobileNetV2'])
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_quant_model = converter.convert()
quant_file_path = 'mobilenet_quantized.tflite'
with open(quant_file_path, 'wb') as f: f.write(tflite_quant_model)

original_size = os.path.getsize('MobileNetV2_best.h5') / (1024 * 1024)
quant_size = os.path.getsize(quant_file_path) / (1024 * 1024)

time_orig = measure_inference_speed(BEST_MODELS['MobileNetV2'], test_ds_mob, 'MobileNetV2 Keras')
interpreter = tf.lite.Interpreter(model_content=tflite_quant_model)
interpreter.allocate_tensors()
time_quant = measure_inference_speed(None, test_ds_mob, 'MobileNetV2 TFLite', interpreter=interpreter)

print(f"Розмір Оригінал: {original_size:.2f} MB | Квантований: {quant_size:.2f} MB")
print(f"Швидкість Keras: {time_orig:.4f} сек | TFLite Quant: {time_quant:.4f} сек (у {time_orig/time_quant:.1f}x швидше)")
FINAL_RESULTS['MobileNetV2_quant'] = {'size': quant_size, 'acc': FINAL_RESULTS['MobileNetV2_10']['acc'], 'inf_time': time_quant}


# --- АНАЛІЗ: СТІЙКІСТЬ ДО ДЕФЕКТІВ ---

def add_noise(image_tensor, mode='gaussian', var=0.01):
    # Приймає tensor, повертає tensor
    img_np = image_tensor.numpy() / 255.0
    if mode == 'gaussian': sigma = var**0.5; noisy_img = img_np + np.random.normal(0, sigma, img_np.shape)
    elif mode == 'salt_pepper': noisy_img = random_noise(img_np, mode='s&p', amount=var)
    else: h, w = IMAGE_SIZE; M = np.float32([[1, 0.1, 0], [0, 1, 0]]); M[0, 2] = -0.1 * h / 2; noisy_img = cv2.warpAffine((img_np * 255).astype(np.uint8), M, (w, h), flags=cv2.INTER_LINEAR) / 255.0
    return tf.convert_to_tensor(np.clip(noisy_img * 255.0, 0, 255).astype(np.float32))

def create_defective_ds(defect_fn, preprocess_fn):
    return test_ds_raw.unbatch().map(lambda x, y: (defect_fn(x), y), num_parallel_calls=tf.data.AUTOTUNE).batch(BATCH_SIZE).map(lambda x, y: (preprocess_fn(x), y) )

print("\n--- АНАЛІЗ: СТІЙКІСТЬ ---")
defect_tests = {'Gaussian (0.01)': lambda x: add_noise(x, 'gaussian'),'Salt-Pepper (0.01)': lambda x: add_noise(x, 'salt_pepper'),'Affine Shear': lambda x: add_noise(x, 'affine'),}

def evaluate_stability(model_name, preprocess_fn):
    acc_results = {}
    original_acc = FINAL_RESULTS[f'{model_name}_20' if 'ResNet' in model_name else f'{model_name}_10']['acc']

    # Завантажуємо модель для тестування (оскільки ми їх видаляли)
    model = tf.keras.models.load_model(f'{model_name}_best.h5')

    for defect, fn in defect_tests.items():
        ds_def = create_defective_ds(fn, preprocess_fn)
        loss, acc = model.evaluate(ds_def, verbose=0)
        acc_results[defect] = acc
        print(f" {model_name} на {defect}: {acc:.4f} ($\Delta$ {original_acc - acc:.4f})")

    FINAL_RESULTS[f'{model_name}_stability'] = acc_results
    del model # Знову видаляємо
    tf.keras.backend.clear_session()
    gc.collect()

evaluate_stability('ResNet50', resnet_preprocess)
evaluate_stability('MobileNetV2', mobilenet_preprocess)


# --- ФІНАЛЬНИЙ ВИВІД ТА МАТРИЦЯ ЗМІШУВАННЯ ---

print("\n\n" + "="*50)
print("             ФІНАЛЬНА ЗВІТНА ТАБЛИЦЯ")
print("="*50)
print(f"| Модель | Точність | Час (сек) | Параметри (M) | Розмір (MB) |")
print("|---|---|---|---|---|")
print(f"| ResNet50 | {FINAL_RESULTS['ResNet50_20']['acc']:.4f} | {FINAL_RESULTS['ResNet50_20']['time']:.2f} | {FINAL_RESULTS['ResNet50_20']['params'] / 1e6:.2f} | - |")
print(f"| MobileNetV2 | {FINAL_RESULTS['MobileNetV2_10']['acc']:.4f} | {FINAL_RESULTS['MobileNetV2_10']['time']:.2f} | {FINAL_RESULTS['MobileNetV2_10']['params'] / 1e6:.2f} | {original_size:.2f} |")
print(f"| Ensemble Avg | {FINAL_RESULTS['Ensemble_Avg']['acc']:.4f} | - | - | - |")
print(f"| MobNet Quant | {FINAL_RESULTS['MobileNetV2_10']['acc']:.4f} | {FINAL_RESULTS['MobileNetV2_quant']['inf_time']:.4f} | - | {quant_size:.2f} |")

# Завантажуємо ResNet для матриці (вона потрібна лише тут)
BEST_MODELS['ResNet50'] = tf.keras.models.load_model('ResNet50_best.h5')
y_pred_res = np.argmax(BEST_MODELS['ResNet50'].predict(preprocess_dataset(test_ds_raw, 'resnet'), verbose=0), axis=1)

# Матриця Змішування (ResNet)
cm = confusion_matrix(y_true, y_pred_res)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=CLASS_NAMES)
fig, ax = plt.subplots(figsize=(10, 10))
disp.plot(cmap=plt.cm.Blues, ax=ax)
plt.title('Confusion Matrix for Best ResNet-50')
plt.show()

# Графік Навчання (ResNet)
def plot_training_history(history_init, history_ft, model_name):
    full_loss = history_init['loss'] + history_ft['loss']
    full_val_loss = history_init['val_loss'] + history_ft['val_loss']
    plt.figure(figsize=(8, 5))
    plt.plot(full_loss, label='Train Loss')
    plt.plot(full_val_loss, label='Validation Loss')
    plt.title(f'{model_name} Loss over Epochs')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

plot_training_history(FINAL_RESULTS['ResNet50_20']['history']['init'], FINAL_RESULTS['ResNet50_20']['history']['ft'], 'ResNet50')

  print(f" {model_name} на {defect}: {acc:.4f} ($\Delta$ {original_acc - acc:.4f})")


ModuleNotFoundError: No module named 'tensorflow'

In [None]:
# Експеримент 2: MobileNetV2 (Fine-tuning 10 шарів)
mobilenet_model, mobilenet_base = create_model(MobileNetV2, IMAGE_SIZE + (3,), NUM_CLASSES)
mobilenet_model_ft, acc_mob, time_mob, hist_mob_init, hist_mob_ft = run_fine_tuning(
    mobilenet_model,
    mobilenet_base,
    train_ds_mob,
    val_ds_mob,
    test_ds_mob,  # <<< ПЕРЕДАЄМО ds_test_mob
    10,
    'MobileNetV2'   # <<< ПЕРЕДАЄМО 'MobileNetV2'
)
FINAL_RESULTS['MobileNetV2_10'] = {'acc': acc_mob, 'time': time_mob, 'params': mobilenet_model.count_params(), 'history': {'init': hist_mob_init, 'ft': hist_mob_ft}}
BEST_MODELS['MobileNetV2'] = mobilenet_model_ft
mobilenet_model_ft.save('MobileNetV2_best.h5')
print(f"MobileNetV2 (10 шарів) | Точність: {acc_mob:.4f} | Час: {time_mob:.2f} сек")


# --- АНАЛІЗ: ENSEMBLE ---

print("\n--- АНАЛІЗ: ENSEMBLE МОДЕЛЕЙ ---")
y_true = np.concatenate([y for x, y in test_ds_raw]).argmax(axis=1)

preds_res = BEST_MODELS['ResNet50'].predict(test_ds_res, verbose=0)
preds_mob = BEST_MODELS['MobileNetV2'].predict(test_ds_mob, verbose=0)

# Середнє Агрегування
preds_avg = (preds_res + preds_mob) / 2
acc_ensemble = accuracy_score(y_true, np.argmax(preds_avg, axis=1))

# Зважене Середнє (Ваги пропорційні точності)
w_res = FINAL_RESULTS['ResNet50_20']['acc']
w_mob = FINAL_RESULTS['MobileNetV2_10']['acc']
total_w = w_res + w_mob
preds_weighted = (w_res / total_w * preds_res) + (w_mob / total_w * preds_mob)
acc_weighted = accuracy_score(y_true, np.argmax(preds_weighted, axis=1))

print(f"Ensemble (Середнє) Точність: {acc_ensemble:.4f}")
print(f"Ensemble (Зважене) Точність: {acc_weighted:.4f}")
FINAL_RESULTS['Ensemble_Avg'] = {'acc': acc_ensemble}


# --- АНАЛІЗ: КВАНТИЗАЦІЯ ---

def measure_inference_speed(model, ds_test, model_name, interpreter=None):
    images_to_test = 500
    test_data_iter = ds_test.unbatch().take(images_to_test)

    if interpreter:
        # Для TFLite
        input_details = interpreter.get_input_details()
        input_index = input_details[0]['index']
        output_details = interpreter.get_output_details()

        start_time = time.time()
        for x, _ in tqdm(test_data_iter, total=images_to_test, desc=f"Inf {model_name}"):
            x_np = x.numpy()[None, ...].astype(input_details[0]['dtype'])
            interpreter.set_tensor(input_index, x_np)
            interpreter.invoke()

    else:
        # Для Keras моделі
        start_time = time.time()
        for x, _ in tqdm(test_data_iter, total=images_to_test, desc=f"Inf {model_name}"):
            model.predict(x[None, ...], verbose=0)

    elapsed_time = time.time() - start_time
    return elapsed_time

print("\n--- АНАЛІЗ: КВАНТИЗАЦІЯ MOBILE NetV2 ---")

# 1. Квантизація
converter = tf.lite.TFLiteConverter.from_keras_model(BEST_MODELS['MobileNetV2'])
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_quant_model = converter.convert()

quant_file_path = 'mobilenet_quantized.tflite'
with open(quant_file_path, 'wb') as f:
    f.write(tflite_quant_model)

original_size = os.path.getsize('MobileNetV2_best.h5') / (1024 * 1024)
quant_size = os.path.getsize(quant_file_path) / (1024 * 1024)
print(f"Розмір Оригінал: {original_size:.2f} MB | Квантований: {quant_size:.2f} MB")
FINAL_RESULTS['MobileNetV2_quant'] = {'size': quant_size, 'acc': 0, 'inf_time': 0}

# 2. Вимірювання швидкості
time_orig = measure_inference_speed(BEST_MODELS['MobileNetV2'], test_ds_mob, 'MobileNetV2 Keras')

interpreter = tf.lite.Interpreter(model_content=tflite_quant_model)
interpreter.allocate_tensors()
time_quant = measure_inference_speed(None, test_ds_mob, 'MobileNetV2 TFLite', interpreter=interpreter)

print(f"Швидкість Keras: {time_orig:.4f} сек | TFLite Quant: {time_quant:.4f} сек (у {time_orig/time_quant:.1f}x швидше)")
FINAL_RESULTS['MobileNetV2_quant']['inf_time'] = time_quant


# --- АНАЛІЗ: СТІЙКІСТЬ ДО ДЕФЕКТІВ ---

def add_noise(image_tensor, mode='gaussian', var=0.01):
    img_np = image_tensor.numpy() / 255.0
    if mode == 'gaussian':
        sigma = var**0.5
        noisy_img = img_np + np.random.normal(0, sigma, img_np.shape)
    elif mode == 'salt_pepper':
        noisy_img = random_noise(img_np, mode='s&p', amount=var)
    else: # Affine Shear
        h, w = IMAGE_SIZE
        M = np.float32([[1, 0.1, 0], [0, 1, 0]]) # Shear factor 0.1
        M[0, 2] = -0.1 * h / 2
        noisy_img = cv2.warpAffine((img_np * 255).astype(np.uint8), M, (w, h)) / 255.0

    return tf.convert_to_tensor(np.clip(noisy_img * 255.0, 0, 255).astype(np.float32))

def create_defective_ds(defect_fn, preprocess_fn):
    return test_ds_raw.unbatch().map(
        lambda x, y: (defect_fn(x), y),
        num_parallel_calls=tf.data.AUTOTUNE
    ).batch(BATCH_SIZE).map(
        lambda x, y: (preprocess_fn(x), y)
    )

print("\n--- АНАЛІЗ: СТІЙКІСТЬ ---")

defect_tests = {
    'Gaussian (0.01)': lambda x: add_noise(x, 'gaussian'),
    'Salt-Pepper (0.01)': lambda x: add_noise(x, 'salt_pepper'),
    'Affine Shear': lambda x: add_noise(x, 'affine'),
}

def evaluate_stability(model, model_name, preprocess_fn):
    acc_results = {}
    original_acc = FINAL_RESULTS[f'{model_name}_20' if 'ResNet' in model_name else f'{model_name}_10']['acc']

    for defect, fn in defect_tests.items():
        ds_def = create_defective_ds(fn, preprocess_fn)
        loss, acc = model.evaluate(ds_def, verbose=0)
        acc_results[defect] = acc
        print(f" {model_name} на {defect}: {acc:.4f} ($\Delta$ {original_acc - acc:.4f})")

    FINAL_RESULTS[f'{model_name}_stability'] = acc_results

evaluate_stability(BEST_MODELS['ResNet50'], 'ResNet50', resnet_preprocess)
evaluate_stability(BEST_MODELS['MobileNetV2'], 'MobileNetV2', mobilenet_preprocess)


# --- ФІНАЛЬНИЙ ВИВІД ТА МАТРИЦЯ ЗМІШУВАННЯ ---

print("\n\n" + "="*50)
print("             ФІНАЛЬНА ЗВІТНА ТАБЛИЦЯ")
print("="*50)
print(f"| Модель | Точність | Час (сек) | Параметри (M) | Розмір (MB) |")
print("|---|---|---|---|---|")
print(f"| ResNet50 | {FINAL_RESULTS['ResNet50_20']['acc']:.4f} | {FINAL_RESULTS['ResNet50_20']['time']:.2f} | {FINAL_RESULTS['ResNet50_20']['params'] / 1e6:.2f} | - |")
print(f"| MobileNetV2 | {FINAL_RESULTS['MobileNetV2_10']['acc']:.4f} | {FINAL_RESULTS['MobileNetV2_10']['time']:.2f} | {FINAL_RESULTS['MobileNetV2_10']['params'] / 1e6:.2f} | {original_size:.2f} |")
print(f"| Ensemble Avg | {FINAL_RESULTS['Ensemble_Avg']['acc']:.4f} | - | - | - |")
print(f"| MobNet Quant | {FINAL_RESULTS['MobileNetV2_10']['acc']:.4f} | {FINAL_RESULTS['MobileNetV2_quant']['inf_time']:.4f} | - | {quant_size:.2f} |")


# Матриця Змішування (ResNet)
y_pred_res = np.argmax(preds_res, axis=1)
cm = confusion_matrix(y_true, y_pred_res)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=CLASS_NAMES)
fig, ax = plt.subplots(figsize=(10, 10))
disp.plot(cmap=plt.cm.Blues, ax=ax)
plt.title('Confusion Matrix for Best ResNet-50')
plt.show()

# Графік Навчання (ResNet)
def plot_training_history(history_init, history_ft, model_name):
    full_loss = history_init['loss'] + history_ft['loss']
    full_val_loss = history_init['val_loss'] + history_ft['val_loss']
    plt.figure(figsize=(8, 5))
    plt.plot(full_loss, label='Train Loss')
    plt.plot(full_val_loss, label='Validation Loss')
    plt.title(f'{model_name} Loss over Epochs')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

plot_training_history(FINAL_RESULTS['ResNet50_20']['history']['init'], FINAL_RESULTS['ResNet50_20']['history']['ft'], 'ResNet50')