In [None]:
import tensorflow as tf
import os
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import pandas as pd
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns

# Dataset paths
base_dir = '/kaggle/input/data-4-sehat-5-sempurna/Dataset'
train_dir = os.path.join(base_dir, 'train')
validation_dir = os.path.join(base_dir, 'validasi')
test_dir = os.path.join(base_dir, 'test')

# Print dataset distribution
def print_dataset_info():
    categories = ['karbohidrat', 'protein', 'buah', 'sayur', 'minuman']
    for split, directory in [('Training', train_dir), ('Validation', validation_dir), ('Test', test_dir)]:
        print(f"\n{split} Dataset Distribution:")
        total = 0
        for category in categories:
            path = os.path.join(directory, category)
            count = len(os.listdir(path)) if os.path.exists(path) else 0
            total += count
            print(f'{category}: {count} images')
        print(f'Total: {total} images')

print_dataset_info()

# Enhanced Data Augmentation with more aggressive transformations
train_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=40,
    width_shift_range=0.3,
    height_shift_range=0.3,
    zoom_range=0.3,
    horizontal_flip=True,
    vertical_flip=True,
    fill_mode='nearest',
    brightness_range=[0.7, 1.3],
    shear_range=0.2,
    channel_shift_range=0.2,
    preprocessing_function=lambda x: tf.image.random_contrast(x, 0.7, 1.3)
)

val_datagen = ImageDataGenerator(rescale=1./255)
test_datagen = ImageDataGenerator(rescale=1./255)

# Increased image size and adjusted batch size
IMG_SIZE = 256  # Increased from 224
BATCH_SIZE = 32 # Increased for better stability

# Create generators with class balancing
train_generator = train_datagen.flow_from_directory(
    train_dir,
    target_size=(IMG_SIZE, IMG_SIZE),
    batch_size=BATCH_SIZE,
    class_mode='categorical',
    shuffle=True
)

validation_generator = val_datagen.flow_from_directory(
    validation_dir,
    target_size=(IMG_SIZE, IMG_SIZE),
    batch_size=BATCH_SIZE,
    class_mode='categorical',
    shuffle=False
)

test_generator = test_datagen.flow_from_directory(
    test_dir,
    target_size=(IMG_SIZE, IMG_SIZE),
    batch_size=BATCH_SIZE,
    class_mode='categorical',
    shuffle=False
)

# Modified Early Stopping Callback
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=15,
    restore_best_weights=True,
    min_delta=0.001
)

# Modified Learning Rate Schedule
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=5,
    min_lr=1e-6,
    verbose=1
)

# Improved CNN Architecture with Residual Connections
def create_improved_model():
    inputs = tf.keras.Input(shape=(IMG_SIZE, IMG_SIZE, 3))
    
    # Initial Conv Block
    x = tf.keras.layers.Conv2D(64, (7, 7), strides=2, padding='same')(inputs)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation('relu')(x)
    x = tf.keras.layers.MaxPooling2D(3, strides=2, padding='same')(x)
    
    # Residual Block 1
    shortcut = x
    x = tf.keras.layers.Conv2D(128, (3, 3), padding='same')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation('relu')(x)
    x = tf.keras.layers.Conv2D(128, (3, 3), padding='same')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    shortcut = tf.keras.layers.Conv2D(128, (1, 1), padding='same')(shortcut)
    x = tf.keras.layers.Add()([x, shortcut])
    x = tf.keras.layers.Activation('relu')(x)
    x = tf.keras.layers.MaxPooling2D(2)(x)
    x = tf.keras.layers.Dropout(0.3)(x)
    
    # Residual Block 2
    shortcut = x
    x = tf.keras.layers.Conv2D(256, (3, 3), padding='same')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation('relu')(x)
    x = tf.keras.layers.Conv2D(256, (3, 3), padding='same')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    shortcut = tf.keras.layers.Conv2D(256, (1, 1), padding='same')(shortcut)
    x = tf.keras.layers.Add()([x, shortcut])
    x = tf.keras.layers.Activation('relu')(x)
    x = tf.keras.layers.MaxPooling2D(2)(x)
    x = tf.keras.layers.Dropout(0.4)(x)
    
    # Global Average Pooling and Dense Layers
    x = tf.keras.layers.GlobalAveragePooling2D()(x)
    x = tf.keras.layers.Dense(512)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation('relu')(x)
    x = tf.keras.layers.Dropout(0.5)(x)
    outputs = tf.keras.layers.Dense(5, activation='softmax')(x)
    
    model = tf.keras.Model(inputs, outputs)
    return model

# Create and compile model with mixed precision
tf.keras.mixed_precision.set_global_policy('mixed_float16')
model = create_improved_model()
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
optimizer = tf.keras.mixed_precision.LossScaleOptimizer(optimizer)

model.compile(
    optimizer=optimizer,
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

# Train model
EPOCHS = 100
history = model.fit(
    train_generator,
    epochs=EPOCHS,
    validation_data=validation_generator,
    callbacks=[early_stopping, reduce_lr]
)

# # Train model
# EPOCHS = 100
# history = model.fit(
#     train_generator,
#     epochs=EPOCHS,
#     validation_data=validation_generator,
#     callbacks=[early_stopping, reduce_lr],
#     workers=4,
#     use_multiprocessing=True
# )

# Plot training history
def plot_training_history(history):
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
    
    # Accuracy plot
    ax1.plot(history.history['accuracy'], label='Training')
    ax1.plot(history.history['val_accuracy'], label='Validation')
    ax1.set_title('Model Accuracy')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Accuracy')
    ax1.legend()
    ax1.grid(True)
    
    # Loss plot
    ax2.plot(history.history['loss'], label='Training')
    ax2.plot(history.history['val_loss'], label='Validation')
    ax2.set_title('Model Loss')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('Loss')
    ax2.legend()
    ax2.grid(True)
    
    plt.tight_layout()
    plt.show()

plot_training_history(history)

# Model evaluation
def evaluate_model(model, generator, set_name="Test"):
    # Predictions
    predictions = model.predict(generator, verbose=1)
    y_pred = np.argmax(predictions, axis=1)
    y_true = generator.classes
    
    # Get class names
    class_names = list(generator.class_indices.keys())
    
    # Confusion Matrix
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(12, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=class_names,
                yticklabels=class_names)
    plt.title(f'Confusion Matrix ({set_name} Set)')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.tight_layout()
    plt.show()
    
    # Classification Report
    print(f"\nClassification Report ({set_name} Set):")
    print(classification_report(y_true, y_pred, target_names=class_names))
    
    # Overall Metrics
    loss, accuracy = model.evaluate(generator, verbose=0)
    print(f"\n{set_name} Set Accuracy: {accuracy:.4f}")
    print(f"{set_name} Set Loss: {loss:.4f}")

# Evaluate on validation and test sets
# print("\Train Set Evaluation:")
# evaluate_model(model, train_generator, "Train")

print("\nValidation Set Evaluation:")
evaluate_model(model, validation_generator, "Validation")

print("\nTest Set Evaluation:")
evaluate_model(model, test_generator, "Test")

# Save model and history
model.save('/kaggle/working/model.h5')
history_df = pd.DataFrame(history.history)
history_df.to_csv('/kaggle/working/training_history.csv')
print("\nModel and training history saved successfully!")