In [3]:
# %% [markdown]
# Train & Export Notebook — Sortir Jagung
#
# Tujuan: notebook lengkap, dapat dijalankan di Google Colab atau lokal untuk
# melatih model klasifikasi (Grade A/B/C) dari folder `dataset/`.
# Fitur utama:
# - Reproducible (seed)
# - Data split (train/val/test 70/20/10)
# - Data pipeline tf.data (fast, prefetch)
# - Augmentasi modern (Keras preprocessing layers)
# - Pilihan model: baseline CNN / MobileNetV2 (transfer learning)
# - Callbacks: EarlyStopping, ReduceLROnPlateau, ModelCheckpoint, CSVLogger
# - Evaluation: confusion matrix, classification report
# - TFLite conversion (post-training quantization with representative dataset)
#
# Petunjuk: upload notebook ini ke Google Colab (Runtime: GPU) atau jalankan lokal
# setelah menginstal dependensi (tensorflow, scikit-learn, matplotlib, pandas).

# %%
# Optional: jika di Colab dan ingin install versi terbaru
# !pip install -q tensorflow==2.12.0 scikit-learn matplotlib pandas

# %%
import os
import random
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Tuple, Dict, Any

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

from sklearn.metrics import classification_report, confusion_matrix

print('TF version', tf.__version__)

# %% [markdown]
# ## Konfigurasi
# Ubah path sesuai struktur projek: `dataset/grade_a`, `dataset/grade_b`, `dataset/grade_c`.

# %%
@dataclass
class TrainConfig:
    dataset_dir: str = '../dataset'   # ubah jika perlu
    out_dir: str = 'model'
    img_size: Tuple[int,int] = (224,224)
    batch_size: int = 32
    seed: int = 42
    epochs: int = 30
    val_ratio: float = 0.2
    test_ratio: float = 0.1
    learning_rate: float = 1e-3
    fine_tune_at: int = 100  # layer index for fine-tuning when using transfer learning
    pretrained: str = 'mobilenetv2'  # options: baseline, mobilenetv2, efficientnet
    class_names: list = None

cfg = TrainConfig()
Path(cfg.out_dir).mkdir(parents=True, exist_ok=True)

# reproducibility
os.environ['PYTHONHASHSEED'] = str(cfg.seed)
random.seed(cfg.seed)
np.random.seed(cfg.seed)
tf.random.set_seed(cfg.seed)

# Enable mixed precision when GPU available (speeds up on recent GPUs)
if tf.config.list_physical_devices('GPU'):
    try:
        from tensorflow.keras import mixed_precision
        policy = mixed_precision.Policy('mixed_float16')
        mixed_precision.set_global_policy(policy)
        print('Enabled mixed precision policy:', mixed_precision.global_policy())
    except Exception as e:
        print('Mixed precision not set:', e)

# %% [markdown]
# ## Helper functions

# %%
def list_image_paths(dataset_dir):
    p = Path(dataset_dir)
    classes = [d.name for d in sorted(p.iterdir()) if d.is_dir()]
    files = []
    labels = []
    for idx, c in enumerate(classes):
        for f in sorted((p/c).glob('*')):
            if f.suffix.lower() in ('.jpg','.jpeg','.png','.bmp','.tiff'):
                files.append(str(f))
                labels.append(idx)
    return files, labels, classes


def prepare_tf_dataset(file_paths, labels, img_size, batch_size, shuffle=False, augment=False, seed=42):
    AUTOTUNE = tf.data.AUTOTUNE
    ds = tf.data.Dataset.from_tensor_slices((file_paths, labels))
    if shuffle:
        ds = ds.shuffle(buffer_size=len(file_paths), seed=seed)

    def _load(path, label):
        image = tf.io.read_file(path)
        image = tf.io.decode_image(image, channels=3, expand_animations=False)
        image = tf.image.convert_image_dtype(image, tf.float32)  # [0,1]
        image = tf.image.resize(image, img_size)
        return image, tf.one_hot(label, depth=len(cfg.class_names))

    ds = ds.map(_load, num_parallel_calls=AUTOTUNE)

    if augment:
        data_augmentation = keras.Sequential([
            layers.RandomFlip('horizontal'),
            layers.RandomRotation(0.06),
            layers.RandomZoom(0.06),
            layers.RandomTranslation(0.05,0.05),
        ])
        ds = ds.map(lambda x,y: (data_augmentation(x, training=True), y), num_parallel_calls=AUTOTUNE)

    ds = ds.batch(batch_size).prefetch(AUTOTUNE)
    return ds


def plot_history(history, out_dir='model'):
    plt.figure(figsize=(12,4))
    plt.subplot(1,2,1)
    plt.plot(history.history['loss'], label='train_loss')
    plt.plot(history.history['val_loss'], label='val_loss')
    plt.legend(); plt.grid(True)
    plt.title('Loss')

    plt.subplot(1,2,2)
    plt.plot(history.history['accuracy'], label='train_acc')
    plt.plot(history.history['val_accuracy'], label='val_acc')
    plt.legend(); plt.grid(True)
    plt.title('Accuracy')
    plt.savefig(Path(cfg.out_dir)/'training_plot.png', dpi=150)
    plt.show()


# %% [markdown]
# ## Load & Split dataset (train / val / test)

# %%
files, labels, classes = list_image_paths(cfg.dataset_dir)
if not files:
    raise SystemExit('No images found - pastikan path dataset benar: ' + cfg.dataset_dir)

cfg.class_names = classes
print('Found classes:', cfg.class_names)

# convert to numpy arrays and stratified split
from sklearn.model_selection import train_test_split
fp = np.array(files)
lab = np.array(labels)

# first split off test
if cfg.test_ratio > 0:
    fp_tmp, fp_test, lab_tmp, lab_test = train_test_split(fp, lab, test_size=cfg.test_ratio, stratify=lab, random_state=cfg.seed)
else:
    fp_tmp, lab_tmp = fp, lab
    fp_test, lab_test = np.array([]), np.array([])

# then split train/val
val_frac_of_tmp = cfg.val_ratio / (1.0 - cfg.test_ratio)
fp_train, fp_val, lab_train, lab_val = train_test_split(fp_tmp, lab_tmp, test_size=val_frac_of_tmp, stratify=lab_tmp, random_state=cfg.seed)

print('Counts: train=', len(fp_train), 'val=', len(fp_val), 'test=', len(fp_test))

# %% [markdown]
# ## Build tf.data pipelines

# %%
train_ds = prepare_tf_dataset(fp_train.tolist(), lab_train.tolist(), cfg.img_size, cfg.batch_size, shuffle=True, augment=True, seed=cfg.seed)
val_ds = prepare_tf_dataset(fp_val.tolist(), lab_val.tolist(), cfg.img_size, cfg.batch_size, shuffle=False, augment=False)
if len(fp_test):
    test_ds = prepare_tf_dataset(fp_test.tolist(), lab_test.tolist(), cfg.img_size, cfg.batch_size, shuffle=False, augment=False)
else:
    test_ds = None

# quick sanity batch
for imgs, labs in train_ds.take(1):
    print('batch images', imgs.shape, 'labels', labs.shape)

# %% [markdown]
# ## Model definitions
# Kita sediakan fungsi untuk: baseline CNN (cepat), MobileNetV2 transfer learning (recommended)

# %%

def get_baseline(input_shape=(224,224,3), n_classes=3, lr=1e-3):
    inp = layers.Input(input_shape)
    x = layers.Conv2D(32,3,activation='relu',padding='same')(inp)
    x = layers.MaxPool2D(2)(x)
    x = layers.Conv2D(64,3,activation='relu',padding='same')(x)
    x = layers.MaxPool2D(2)(x)
    x = layers.Conv2D(128,3,activation='relu',padding='same')(x)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(128, activation='relu')(x)
    out = layers.Dense(n_classes, activation='softmax', dtype='float32')(x)
    model = keras.Model(inp, out)
    model.compile(optimizer=keras.optimizers.Adam(lr), loss='categorical_crossentropy', metrics=['accuracy'])
    return model


def get_mobilenetv2(input_shape=(224,224,3), n_classes=3, lr=1e-4, fine_tune_at=100):
    base = keras.applications.MobileNetV2(input_shape=input_shape, include_top=False, weights='imagenet')
    base.trainable = True
    # freeze until fine_tune_at
    for layer in base.layers[:fine_tune_at]:
        layer.trainable = False
    inp = layers.Input(shape=input_shape)
    x = base(inp, training=False)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dropout(0.25)(x)
    x = layers.Dense(128, activation='relu')(x)
    out = layers.Dense(n_classes, activation='softmax', dtype='float32')(x)
    model = keras.Model(inp, out)
    model.compile(optimizer=keras.optimizers.Adam(lr), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

# %% [markdown]
# ## Choose & create model

# %%
if cfg.pretrained == 'baseline':
    model = get_baseline(input_shape=(*cfg.img_size,3), n_classes=len(cfg.class_names), lr=cfg.learning_rate)
elif cfg.pretrained == 'mobilenetv2':
    model = get_mobilenetv2(input_shape=(*cfg.img_size,3), n_classes=len(cfg.class_names), lr=cfg.learning_rate, fine_tune_at=cfg.fine_tune_at)
else:
    raise ValueError('Unknown model choice')

model.summary()

# %% [markdown]
# ## Callbacks

# %%
ckpt_path = str(Path(cfg.out_dir)/'best_model.h5')
callbacks = [
    keras.callbacks.ModelCheckpoint(ckpt_path, monitor='val_accuracy', save_best_only=True, save_weights_only=False),
    keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=8, restore_best_weights=True),
    keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=4, min_lr=1e-7),
    keras.callbacks.CSVLogger(str(Path(cfg.out_dir)/'training_log.csv')),
]

# %% [markdown]
# ## Class weights (optional if imbalance)

# %%
from collections import Counter
cnt = Counter(lab_train)
print('Train class counts:', cnt)
class_weights = None
# if imbalance, compute weights
if len(cnt) and max(cnt.values())/min(cnt.values())>1.5:
    total = sum(cnt.values())
    class_weights = {i: total/(len(cnt)*c) for i,c in cnt.items()}
    print('Using class_weights:', class_weights)

# %% [markdown]
# ## Train

# %%
history = model.fit(train_ds, validation_data=val_ds, epochs=cfg.epochs, callbacks=callbacks, class_weight=class_weights)

# save final model & history
model.save(str(Path(cfg.out_dir)/'final_model.h5'))
with open(str(Path(cfg.out_dir)/'config.json'),'w') as f:
    json.dump(cfg.__dict__, f, indent=2)

plot_history(history, out_dir=cfg.out_dir)

# %% [markdown]
# ## Evaluate on test set

# %%
if test_ds is not None:
    best = keras.models.load_model(ckpt_path)
    y_true = []
    y_pred = []
    for imgs, labs in test_ds:
        preds = best.predict(imgs)
        y_pred.extend(np.argmax(preds, axis=1).tolist())
        y_true.extend(np.argmax(labs.numpy(), axis=1).tolist())
    print('Classification report:')
    print(classification_report(y_true, y_pred, target_names=cfg.class_names))
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(6,6));
    import seaborn as sns
    sns.heatmap(cm, annot=True, fmt='d', xticklabels=cfg.class_names, yticklabels=cfg.class_names, cmap='Blues')
    plt.xlabel('Predicted'); plt.ylabel('True'); plt.title('Confusion Matrix')
    plt.savefig(Path(cfg.out_dir)/'confusion_matrix.png', dpi=150); plt.show()
else:
    print('No test dataset (test_ratio==0)')

# %% [markdown]
# ## Convert to TFLite (with full integer quantization using representative dataset)
# This step is optional but recommended for deploying to ESP32/Raspberry Pi.

# %%
if True:
    model_for_export = keras.models.load_model(ckpt_path)
    converter = tf.lite.TFLiteConverter.from_keras_model(model_for_export)
    # Try float16 optimization when full integer not necessary
    converter.optimizations = [tf.lite.Optimize.DEFAULT]

    # Representative dataset for quantization (use a small subset)
    def representative_data_gen():
        for img, _ in train_ds.unbatch().batch(1).take(100):
            # img is float32 in [0,1]
            yield [img]

    converter.representative_dataset = representative_data_gen
    # target ops can be set depending on target device
    try:
        tflite_model = converter.convert()
        tflite_path = Path(cfg.out_dir)/'model.tflite'
        tflite_path.write_bytes(tflite_model)
        print('Saved tflite to', tflite_path)
    except Exception as e:
        print('TFLite conversion failed:', e)

# %% [markdown]
# ## Next steps / tips
# - Jika akurasi belum memuaskan: coba augmentasi lebih kuat, learning rate scheduler, atau transfer learning lain (EfficientNetB0).
# - Untuk deployment ke ESP32: gunakan model.tflite dan konversi ke format yang didukung board (ukuran, ops)
# - Simpan mapping kelas (`class_names`) di `model/config.json` agar inferensi tahu labelnya.

# %%
print('Done. Models & artifacts saved to', cfg.out_dir)


ImportError: cannot import name 'pywrap_tensorflow' from 'tensorflow.python' (C:\Users\snNHQ\AppData\Roaming\Python\Python312\site-packages\tensorflow\python\__init__.py)