In [11]:
import os
import cv2
import numpy as np
import imgaug.augmenters as iaa
import shutil
import random
import tensorflow as tf
import seaborn as sns
import matplotlib.pyplot as plt
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.layers import GlobalAveragePooling2D, Dense, Dropout, BatchNormalization
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from sklearn.metrics import confusion_matrix, classification_report, precision_score, recall_score, f1_score

In [12]:
input_directory = 'C:\\Bangkit\\ML\\code\\preprocessing\\dataset'
output_directory = 'C:\\Bangkit\\ML\\code\\preprocessing\\fix-prepo\\output'
split_directory = 'C:\\Bangkit\\ML\\code\\preprocessing\\fix-prepo\\split'

In [13]:
os.listdir(input_directory)

['check.ipynb',
 'geblek-renteng',
 'gentongan',
 'liong',
 'mega-mendung',
 'parang',
 'sekar-jagad',
 'sidomukti',
 'tambal',
 'truntum',
 'tujuh-rupa']

# PREPROCESSING


In [14]:
# Resize function
def resize_image(image, target_size=(224, 224)):
    resized_image = cv2.resize(image, target_size)
    return resized_image

In [15]:
# Normalize function
def min_max_normalize_image(image):
    # Convert image data type to float32
    image = image.astype('float32')

    # Normalize with Min-Max Scaling
    min_val = np.min(image)
    max_val = np.max(image)
    if min_val == max_val:
        return np.zeros_like(image)
    else:
        normalized_image = (image - min_val) / (max_val - min_val)
        return normalized_image

In [16]:
# Contrast enhancement function
def apply_clahe(image, clip_limit=2.0, tile_grid_size=(8, 8)):
    lab_image = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
    l_channel, a_channel, b_channel = cv2.split(lab_image)
    clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=tile_grid_size)
    enhanced_l_channel = clahe.apply(l_channel)
    clahe_image = cv2.merge((enhanced_l_channel, a_channel, b_channel))
    enhanced_image = cv2.cvtColor(clahe_image, cv2.COLOR_LAB2BGR)
    return enhanced_image

In [17]:
# Augmentation Function
def augment_images(input_folder, output_folder, num_images=10):
    seq = iaa.Sequential([
        iaa.Fliplr(0.5),
        iaa.Affine(rotate=(-20, 20)),
        iaa.GaussianBlur(sigma=(0, 1.0)),
        iaa.AdditiveGaussianNoise(scale=(0, 0.05 * 255)),
        iaa.Multiply((0.8, 1.2), per_channel=0.2),
        iaa.ContrastNormalization((0.5, 2.0), per_channel=0.5),
    ])
    os.makedirs(output_folder, exist_ok=True)
    for folder_name in os.listdir(input_folder):
        input_folder_path = os.path.join(input_folder, folder_name)
        if not os.path.isdir(input_folder_path):
            continue
        output_folder_path = os.path.join(output_folder, folder_name)
        os.makedirs(output_folder_path, exist_ok=True)
        image_files = [f for f in os.listdir(input_folder_path) if os.path.isfile(os.path.join(input_folder_path, f))]
        selected_files = random.sample(image_files, min(num_images, len(image_files)))
        for filename in selected_files:
            input_image_path = os.path.join(input_folder_path, filename)
            try:
                image = cv2.imread(input_image_path)
                images_aug = [seq(image=image) for _ in range(num_images)]
                for idx, image_aug in enumerate(images_aug):
                    output_image_path = os.path.join(output_folder_path, f"{filename.split('.')[0]}_{idx}.jpg")
                    cv2.imwrite(output_image_path, image_aug)
            except Exception as e:
                print(f"Error augmenting image {filename}: {str(e)}")

In [18]:
# Custom preprocessing function
def custom_preprocess(image, seq):
    image = resize_image(image)
    image = min_max_normalize_image(image)
    
    # Convert image back to uint8 for CLAHE
    image = (image * 255).astype('uint8')
    image = apply_clahe(image)
    image = seq.augment_image(image)
    return image

In [19]:
# Function to process and save images
def process_and_save_images(input_dir, output_dir, seq):
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    for root, _, files in os.walk(input_dir):
        for file in files:
            if file.endswith(('.jpg', '.jpeg', '.png')):
                input_path = os.path.join(root, file)
                relative_path = os.path.relpath(root, input_dir)
                output_folder = os.path.join(output_dir, relative_path)

                if not os.path.exists(output_folder):
                    os.makedirs(output_folder)

                image = cv2.imread(input_path)
                if image is not None:
                    for i in range(3):
                        processed_image = custom_preprocess(image, seq)
                        output_filename = f"{os.path.splitext(file)[0]}_aug{i + 1}.jpg"
                        output_path = os.path.join(output_folder, output_filename)
                        cv2.imwrite(output_path, (processed_image * 255).astype('uint8'))

In [20]:
# Define augmentation sequence
seq = iaa.Sequential([
    iaa.Fliplr(0.5),
    iaa.Affine(rotate=(-20, 20)),
    iaa.GaussianBlur(sigma=(0, 1.0)),
    iaa.AdditiveGaussianNoise(scale=(0, 0.05 * 255)),
    iaa.Multiply((0.8, 1.2), per_channel=0.2),
    iaa.ContrastNormalization((0.5, 2.0), per_channel=0.5),
])

  warn_deprecated(msg, stacklevel=3)


In [21]:
# Process and save images
process_and_save_images(input_directory, output_directory, seq)

In [22]:
# Membuat folder train, test, dan val di dalam folder split
train_dir = os.path.join(split_directory, 'train')
test_dir = os.path.join(split_directory, 'test')
val_dir = os.path.join(split_directory, 'val')
os.makedirs(train_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)
os.makedirs(val_dir, exist_ok=True)

In [23]:
def split_dataset(input_dir, output_dir, train_ratio=0.7, val_ratio=0.1, test_ratio=0.2):
    # Make sure output directories exist
    train_dir = os.path.join(output_dir, 'train')
    val_dir = os.path.join(output_dir, 'val')
    test_dir = os.path.join(output_dir, 'test')

    for dir_path in [train_dir, val_dir, test_dir]:
        if not os.path.exists(dir_path):
            os.makedirs(dir_path)

    # Iterate through each class folder
    for root, _, files in os.walk(input_dir):
        if files:  # If there are files in the directory
            # Determine class name based on the relative path
            relative_path = os.path.relpath(root, input_dir)
            class_name = os.path.basename(relative_path)

            # Create corresponding class folders in train, val, test directories
            class_train_dir = os.path.join(train_dir, class_name)
            class_val_dir = os.path.join(val_dir, class_name)
            class_test_dir = os.path.join(test_dir, class_name)

            for dir_path in [class_train_dir, class_val_dir, class_test_dir]:
                if not os.path.exists(dir_path):
                    os.makedirs(dir_path)

            # Shuffle files in the current class folder
            random.shuffle(files)

            # Calculate number of files for train, val, test
            num_files = len(files)
            num_train = int(train_ratio * num_files)
            num_val = int(val_ratio * num_files)
            num_test = num_files - num_train - num_val

            # Assign files to train, val, test datasets
            train_files = files[:num_train]
            val_files = files[num_train:num_train + num_val]
            test_files = files[num_train + num_val:]

            # Move files to corresponding directories
            for file in train_files:
                src_path = os.path.join(root, file)
                dst_path = os.path.join(class_train_dir, file)
                shutil.copy(src_path, dst_path)

            for file in val_files:
                src_path = os.path.join(root, file)
                dst_path = os.path.join(class_val_dir, file)
                shutil.copy(src_path, dst_path)

            for file in test_files:
                src_path = os.path.join(root, file)
                dst_path = os.path.join(class_test_dir, file)
                shutil.copy(src_path, dst_path)

In [24]:
# Split dataset into train, val, test
split_dataset(output_directory, split_directory)


In [32]:
#Hyperparameter
image_size = (224, 224)
batch_size = 32
epochs = 15
learning_rate = 0.0001

In [33]:
train_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=40,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)
train_generator = train_datagen.flow_from_directory(
    train_dir,
    target_size=image_size,
    batch_size=batch_size,
    class_mode='categorical'
)

Found 1047 images belonging to 10 classes.


In [34]:
validation_datagen = ImageDataGenerator(rescale=1./255)
validation_generator = validation_datagen.flow_from_directory(
    val_dir,
    target_size=image_size,
    batch_size=batch_size,
    class_mode='categorical'
)

Found 149 images belonging to 10 classes.


In [35]:
test_datagen = ImageDataGenerator(rescale=1./255)
test_generator = test_datagen.flow_from_directory(
    test_dir,
    target_size=image_size,
    batch_size=batch_size,
    class_mode='categorical',
    shuffle=False
)

Found 301 images belonging to 10 classes.


In [36]:
base_model = MobileNetV2(weights='imagenet', include_top=False)

# Build the model
x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dense(1024, activation='relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.5)(x)
predictions = Dense(train_generator.num_classes, activation='softmax')(x)

model = Model(inputs=base_model.input, outputs=predictions)

for layer in base_model.layers:
    layer.trainable = False

model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy'])

  base_model = MobileNetV2(weights='imagenet', include_top=False)


In [37]:
# Callbacks for early stopping and learning rate reduction
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=1e-6)

In [38]:
# Train the model
history = model.fit(
    train_generator,
    epochs=epochs,
    validation_data=validation_generator,
    callbacks=[early_stopping, reduce_lr],
    verbose=1
)

Epoch 1/15
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m22s[0m 533ms/step - accuracy: 0.4035 - loss: 2.1280 - val_accuracy: 0.5570 - val_loss: 1.5205 - learning_rate: 0.0010
Epoch 2/15
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m15s[0m 403ms/step - accuracy: 0.7217 - loss: 0.8904 - val_accuracy: 0.6846 - val_loss: 1.0128 - learning_rate: 0.0010
Epoch 3/15
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m14s[0m 394ms/step - accuracy: 0.7444 - loss: 0.7778 - val_accuracy: 0.6913 - val_loss: 0.8777 - learning_rate: 0.0010
Epoch 4/15
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m26s[0m 770ms/step - accuracy: 0.7815 - loss: 0.6380 - val_accuracy: 0.7450 - val_loss: 0.6720 - learning_rate: 0.0010
Epoch 5/15
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m36s[0m 953ms/step - accuracy: 0.8094 - loss: 0.5822 - val_accuracy: 0.7450 - val_loss: 0.6591 - learning_rate: 0.0010
Epoch 6/15
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m 

In [39]:
# Fine-tune some layers of the base model
for layer in base_model.layers[-30:]:
    layer.trainable = True

In [40]:
# Recompile the model with a lower learning rate for fine-tuning
model.compile(optimizer=Adam(learning_rate=learning_rate * 0.1), loss='categorical_crossentropy', metrics=['accuracy'])

# Fine-tune the model
history_fine_tune = model.fit(
    train_generator,
    epochs=epochs,
    validation_data=validation_generator,
    callbacks=[early_stopping, reduce_lr],
    verbose=1
)

Epoch 1/15
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m23s[0m 491ms/step - accuracy: 0.6577 - loss: 1.1083 - val_accuracy: 0.7919 - val_loss: 0.5961 - learning_rate: 1.0000e-05
Epoch 2/15
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 460ms/step - accuracy: 0.6917 - loss: 0.9640 - val_accuracy: 0.7651 - val_loss: 0.6713 - learning_rate: 1.0000e-05
Epoch 3/15
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m17s[0m 466ms/step - accuracy: 0.7379 - loss: 0.8435 - val_accuracy: 0.7517 - val_loss: 0.7253 - learning_rate: 1.0000e-05
Epoch 4/15
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m17s[0m 467ms/step - accuracy: 0.7423 - loss: 0.7944 - val_accuracy: 0.7383 - val_loss: 0.7781 - learning_rate: 1.0000e-05
Epoch 5/15
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 457ms/step - accuracy: 0.7492 - loss: 0.7216 - val_accuracy: 0.7248 - val_loss: 0.8092 - learning_rate: 1.0000e-05
Epoch 6/15
[1m33/33[0m [32m━━━━━━━━━━━━━━━

In [41]:
# Evaluate the model on the test data
test_loss, test_acc = model.evaluate(test_generator, verbose=2)
print(f'Test accuracy: {test_acc}')


10/10 - 4s - 415ms/step - accuracy: 0.7674 - loss: 0.8327
Test accuracy: 0.7674418687820435
