In [1]:
import tensorflow as tf

from keras.src.applications import imagenet_utils
# For versions <TF2.13 change the above import to:
# from keras.applications import imagenet_utils

from tensorflow.keras import layers
from tensorflow import keras

2024-01-28 05:57:22.837487: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-01-28 05:57:22.837547: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-01-28 05:57:22.838667: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-01-28 05:57:22.847342: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
# Values are from table 4.
patch_size = 4  # 2x2, for the Transformer blocks.
image_size = 256
expansion_factor = 2  # expansion factor for the MobileNetV2 blocks.

In [3]:
def conv_block(x, filters=16, kernel_size=3, strides=2):
    conv_layer = layers.Conv2D(
        filters, kernel_size, strides=strides, activation=tf.nn.swish, padding="same"
    )
    return conv_layer(x)

def inverted_residual_block(x, expanded_channels, output_channels, strides=1):
    m = layers.Conv2D(expanded_channels, 1, padding="same", use_bias=False)(x)
    m = layers.BatchNormalization()(m)
    m = tf.nn.swish(m)

    if strides == 2:
        m = layers.ZeroPadding2D(padding=imagenet_utils.correct_pad(m, 3))(m)
    m = layers.DepthwiseConv2D(
        3, strides=strides, padding="same" if strides == 1 else "valid", use_bias=False
    )(m)
    m = layers.BatchNormalization()(m)
    m = tf.nn.swish(m)

    m = layers.Conv2D(output_channels, 1, padding="same", use_bias=False)(m)
    m = layers.BatchNormalization()(m)

    if tf.math.equal(x.shape[-1], output_channels) and strides == 1:
        return layers.Add()([m, x])
    return m

def mlp(x, hidden_units, dropout_rate):
    for units in hidden_units:
        x = layers.Dense(units, activation=tf.nn.swish)(x)
        x = layers.Dropout(dropout_rate)(x)
    return x


def transformer_block(x, transformer_layers, projection_dim, num_heads=2, return_sequences=True):
    for _ in range(transformer_layers):
        # Layer normalization 1.
        x1 = layers.LayerNormalization(epsilon=1e-6)(x)
        # Create a multi-head attention layer.
        attention_output = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=projection_dim, dropout=0.1
        )(x1, x1)
        # Skip connection 1.
        x2 = layers.Add()([attention_output, x])
        # Layer normalization 2.
        x3 = layers.LayerNormalization(epsilon=1e-6)(x2)
        # MLP.
        x3 = mlp(x3, hidden_units=[x.shape[-1] * 2, x.shape[-1]], dropout_rate=0.1,)
        # Skip connection 2.
        x = layers.Add()([x3, x2])

    return x


def mobilevit_block(x, num_blocks, projection_dim, strides=1):
    # Local projection with convolutions.
    local_features = conv_block(x, filters=projection_dim, strides=strides)
    local_features = conv_block(
        local_features, filters=projection_dim, kernel_size=1, strides=strides
    )

    # Unfold into patches and then pass through Transformers.
    num_patches = int((local_features.shape[1] * local_features.shape[2]) / patch_size)
    non_overlapping_patches = layers.Reshape((patch_size, num_patches, projection_dim))(
        local_features
    )
    global_features = transformer_block(
        non_overlapping_patches, num_blocks, projection_dim
    )

    # Fold into conv-like feature-maps.
    folded_feature_map = layers.Reshape((*local_features.shape[1:-1], projection_dim))(
        global_features
    )

    # Apply point-wise conv -> concatenate with the input features.
    folded_feature_map = conv_block(
        folded_feature_map, filters=x.shape[-1], kernel_size=1, strides=strides
    )
    local_global_features = layers.Concatenate(axis=-1)([x, folded_feature_map])

    # Fuse the local and global features using a convoluion layer.
    local_global_features = conv_block(
        local_global_features, filters=projection_dim, strides=strides
    )

    return local_global_features

In [4]:
def create_mobilevit(num_classes=6):
    inputs = keras.Input((image_size, image_size, 3))

    x = layers.Rescaling(scale=1.0 / 255)(inputs)

    # Initial conv-stem -> MV2 block.
    x = conv_block(x, filters=16)
    x = inverted_residual_block(
        x, expanded_channels=16 * expansion_factor, output_channels=16
    )

    # Downsampling with MV2 block.
    x = inverted_residual_block(
        x, expanded_channels=16 * expansion_factor, output_channels=24, strides=2
    )
    x = inverted_residual_block(
        x, expanded_channels=24 * expansion_factor, output_channels=24
    )
    x = inverted_residual_block(
        x, expanded_channels=24 * expansion_factor, output_channels=24
    )

    # First MV2 -> MobileViT block.
    x = inverted_residual_block(
        x, expanded_channels=24 * expansion_factor, output_channels=48, strides=2
    )
    x = mobilevit_block(x, num_blocks=2, projection_dim=64)

    # Second MV2 -> MobileViT block.
    x = inverted_residual_block(
        x, expanded_channels=64 * expansion_factor, output_channels=64, strides=2
    )
    x = mobilevit_block(x, num_blocks=4, projection_dim=80)

    # Third MV2 -> MobileViT block.
    x = inverted_residual_block(
        x, expanded_channels=80 * expansion_factor, output_channels=80, strides=2
    )
    x = mobilevit_block(x, num_blocks=3, projection_dim=96)
    x = conv_block(x, filters=320, kernel_size=1, strides=1)

    # Classification head.
    x = layers.GlobalAvgPool2D()(x)
    outputs = layers.Dense(num_classes, activation="softmax")(x)

    return keras.Model(inputs, outputs)


mobilevit_xxs = create_mobilevit()
mobilevit_xxs.summary()


2024-01-28 05:57:26.234276: I external/local_xla/xla/stream_executor/cuda/cuda_executor.cc:901] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2024-01-28 05:57:26.234571: I external/local_xla/xla/stream_executor/cuda/cuda_executor.cc:901] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2024-01-28 05:57:26.278563: W tensorflow/core/common_runtime/gpu/gpu_device.cc:2256] Cannot dlopen some GPU libraries. Please make sure the missing libraries mentioned above are installed properly if you would like to use GPU. Follow the guide at https://www.tensorflow.org/install/gpu for how to download and setup the required l

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 256, 256, 3)]        0         []                            
                                                                                                  
 rescaling (Rescaling)       (None, 256, 256, 3)          0         ['input_1[0][0]']             
                                                                                                  
 conv2d (Conv2D)             (None, 128, 128, 16)         448       ['rescaling[0][0]']           
                                                                                                  
 conv2d_1 (Conv2D)           (None, 128, 128, 32)         512       ['conv2d[0][0]']              
                                                                                              

In [5]:
batch_size = 64
auto = tf.data.AUTOTUNE
resize_bigger = 280
num_classes = 6


def preprocess_dataset(is_training=True):
    def _pp(image, label):
        if is_training:
            # Resize to a bigger spatial resolution and take the random
            # crops.
            image = tf.image.resize(image, (resize_bigger, resize_bigger))
            image = tf.image.random_crop(image, (image_size, image_size, 3))
            image = tf.image.random_flip_left_right(image)
            label = tf.one_hot(label, depth=num_classes)
        else:
            image = tf.image.resize(image, (image_size, image_size))
        label = tf.one_hot(label, depth=num_classes)
        return image, label

    return _pp


def prepare_dataset(dataset, is_training=True):
    if is_training:
        dataset = dataset.shuffle(batch_size * 10)
    dataset = dataset.map(preprocess_dataset(is_training), num_parallel_calls=auto)
    return dataset.batch(batch_size).prefetch(auto)

In [6]:
# import tensorflow as tf
# from tensorflow.keras.preprocessing.image import ImageDataGenerator

# # Bước 1: Load và Tiền Xử lý Dữ liệu
# dataset_path = "/content/drive/MyDrive/Colab Notebooks/cnn/figure/garbage_classification"

# # Tạo một list của đường dẫn tới các file ảnh
# image_paths = tf.data.Dataset.list_files(dataset_path + "/*/*")

# # Hàm tiền xử lý cho mỗi ảnh
# def preprocess_image(image_path):
#     # Đọc ảnh từ đường dẫn
#     image = tf.io.read_file(image_path)
#     # Giải mã ảnh
#     image = tf.image.decode_jpeg(image, channels=3)
#     # Resize ảnh về kích thước mong muốn
#     image = tf.image.resize(image, (image_size, image_size))
#     # Normalization
#     image = image / 255.0
#     return image

# # Áp dụng hàm tiền xử lý cho từng ảnh
# # dataset = image_paths.map(preprocess_image) `

In [7]:
dataset_path1 = "../../SmartBin/data/raw_data/Garbage classification dataset"
# image_paths = tf.data.Dataset.list_files(dataset_path + "/*/*")
# def preprocess_image(image_path):
# # Đọc ảnh từ đường dẫn
# image = tf.io.read_file(image_paths)
# # Giải mã ảnh
# image = tf.image.decode_jpeg(image, channels=3)
# # Resize ảnh về kích thước mong muốn
# image = tf.image.resize(image, (image_size, image_size))
# # Normalization
# image = image / 255.0
# return image

In [8]:
train_ds_original = tf.keras.preprocessing.image_dataset_from_directory(
    dataset_path1,
    batch_size=batch_size,
    image_size=(256, 256),
    validation_split=0.2,
    subset="training",
    seed=42,
    labels="inferred",
    label_mode="categorical")

Found 7188 files belonging to 6 classes.
Using 5751 files for training.


In [9]:
val_ds_original = tf.keras.preprocessing.image_dataset_from_directory(
    dataset_path1,
    batch_size=batch_size,
    image_size=(256, 256),
    validation_split=0.2,
    subset="validation",
    seed=42,
    labels="inferred",
    label_mode="categorical")

Found 7188 files belonging to 6 classes.
Using 1437 files for validation.


In [10]:
X = train_ds_original
y = val_ds_original

In [11]:
# # Lấy số lượng ảnh trong dataset
# num_images = len(image_paths)

# # Tính toán số lượng ảnh cho tập validation
# num_val_images = int(0.2 * num_images)

# # Tạo tập validation và tập train
# val_dataset = dataset.take(num_val_images)
# train_dataset = dataset.skip(num_val_images)

In [12]:
# import numpy as np

In [13]:
# X = train_dataset.batch(batch_size).prefetch(auto)
# y = val_dataset.batch(batch_size).prefetch(auto)

In [14]:
learning_rate = 0.002
label_smoothing_factor = 0.1
epochs = 30

optimizer = keras.optimizers.Adam(learning_rate=learning_rate)
loss_fn = keras.losses.CategoricalCrossentropy(label_smoothing=label_smoothing_factor)


def run_experiment(epochs=epochs):
    mobilevit_xxs = create_mobilevit(num_classes=6)
    mobilevit_xxs.compile(optimizer=optimizer, loss=loss_fn, metrics=["accuracy"])

    checkpoint_filepath = "/tmp/checkpoint6"
    checkpoint_callback = keras.callbacks.ModelCheckpoint(
        checkpoint_filepath,
        monitor="val_accuracy",
        save_best_only=True,
        save_weights_only=True,
    )

    mobilevit_xxs.fit(
        # train_dataset ,
        # validation_data=val_dataset,
        X,
        validation_data = y,
        epochs=epochs,
        callbacks=[checkpoint_callback],
    )
    mobilevit_xxs.load_weights(checkpoint_filepath)
    _, accuracy = mobilevit_xxs.evaluate(y)
    print(f"Validation accuracy: {round(accuracy * 100, 2)}%")
    return mobilevit_xxs


mobilevit_xxs = run_experiment()

Epoch 1/30




InvalidArgumentError: Graph execution error:

Detected at node decode_image/DecodeImage defined at (most recent call last):
<stack traces unavailable>
Unknown image file format. One of JPEG, PNG, GIF, BMP required.
	 [[{{node decode_image/DecodeImage}}]]
	 [[IteratorGetNext]] [Op:__inference_train_function_42970]

In [None]:
# Serialize the model as a SavedModel.
# mobilevit_xxs.save("mobilevit_xxs_3classes_3")

# Convert to TFLite. This form of quantization is called
# post-training dynamic-range quantization in TFLite.
converter = tf.lite.TFLiteConverter.from_saved_model("mobilevit_xxs_3classes_3")
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [
    tf.lite.OpsSet.TFLITE_BUILTINS,  # Enable TensorFlow Lite ops.
    tf.lite.OpsSet.SELECT_TF_OPS,  # Enable TensorFlow ops.
]
tflite_model = converter.convert()
open("mobilevit_xxs_3classes_3.tflite", "wb").write(tflite_model)

In [None]:
import numpy as np
from sklearn.metrics import classification_report

# Load the saved model
loaded_model = keras.models.load_model("mobilevit_xxs_3classes_3")

# Create a list to store true labels and predicted labels
y_true_list = []
y_pred_list = []

# Iterate over the validation dataset to obtain true and predicted labels
for images, labels in y:
    y_true_list.extend(np.argmax(labels, axis=1))
    y_pred = np.argmax(loaded_model.predict(images), axis=1)
    y_pred_list.extend(y_pred)

# Convert lists to NumPy arrays
y_true = np.array(y_true_list)
y_pred = np.array(y_pred_list)

# Print classification report
print(classification_report(y_true, y_pred))
