In [1]:
import tensorflow as tf
import numpy as np
import tensorflow_datasets as tfds
from tensorflow.keras import layers, models
import pathlib
import os


In [2]:
(ds_train, ds_test), ds_info = tfds.load(
    'cifar10',
    split=['train', 'test'],
    as_supervised=True,
    with_info=True,
    shuffle_files=True
)



Downloading and preparing dataset Unknown size (download: Unknown size, generated: Unknown size, total: Unknown size) to /root/tensorflow_datasets/cifar10/3.0.2...


Dl Completed...: 0 url [00:00, ? url/s]

Dl Size...: 0 MiB [00:00, ? MiB/s]

Extraction completed...: 0 file [00:00, ? file/s]

Generating splits...:   0%|          | 0/2 [00:00<?, ? splits/s]

Generating train examples...: 0 examples [00:00, ? examples/s]

Shuffling /root/tensorflow_datasets/cifar10/incomplete.Y1EVTC_3.0.2/cifar10-train.tfrecord*...:   0%|         …

Generating test examples...: 0 examples [00:00, ? examples/s]

Shuffling /root/tensorflow_datasets/cifar10/incomplete.Y1EVTC_3.0.2/cifar10-test.tfrecord*...:   0%|          …

Dataset cifar10 downloaded and prepared to /root/tensorflow_datasets/cifar10/3.0.2. Subsequent calls will reuse this data.


In [3]:
IMG_SIZE = 128
BATCH_SIZE = 64
NUM_CLASSES = ds_info.features['label'].num_classes  # 10

# data preprocessing : resize + normalization
def preprocess(image, label):
    # 32x32 -> 128x128
    image = tf.image.resize(image, (IMG_SIZE, IMG_SIZE))
    # 0~255 -> 0~1
    image = tf.cast(image, tf.float32) / 255.0
    return image, label

In [4]:
ds_train = ds_train.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE) #여러 이미지를 동시에 병렬로 전처리
ds_train = ds_train.batch(BATCH_SIZE)

ds_test = ds_test.map(preprocess).batch(BATCH_SIZE)

In [5]:
class MobileNetV1():
    def __init__(self, num_classes=10, alpha=1.0, img_size=(IMG_SIZE, IMG_SIZE), dropout=0.001):
        self.num_classes = num_classes
        self.alpha = alpha
        self.dropout = dropout
        self.img_size = img_size
        self.model = None

    def Standard_Conv(self, filters=32, strides=(2,2)):
        return models.Sequential([
            layers.Conv2D(filters=int(filters*self.alpha), kernel_size=3, strides=strides, padding='same'),
            layers.BatchNormalization(),
            layers.ReLU()
        ])

    def DepthwiseConv(self, strides=(1,1), padding='same'):
        return models.Sequential([
            layers.DepthwiseConv2D(kernel_size=3, strides=strides, padding=padding),
            layers.BatchNormalization(),
            layers.ReLU()
        ])

    def PointwiseConv(self, filters):
        return models.Sequential([
            layers.Conv2D(filters=int(filters*self.alpha), kernel_size=1, strides=1, padding='same'),
            layers.BatchNormalization(),
            layers.ReLU()
        ])

    def Depthwise_Separable_Conv(self, strides=(1,1), filters=64):
        return models.Sequential([
            self.DepthwiseConv(strides=strides, padding='same'),
            self.PointwiseConv(filters=filters)
        ])

    def build(self):
        self.model = models.Sequential([
            layers.InputLayer(input_shape=(self.img_size[0], self.img_size[1], 3)),

            self.Standard_Conv(filters=32, strides=(2,2)),

            self.Depthwise_Separable_Conv(strides=(1,1), filters=64),
            self.Depthwise_Separable_Conv(strides=(2,2), filters=128),
            self.Depthwise_Separable_Conv(strides=(1,1), filters=128),
            self.Depthwise_Separable_Conv(strides=(2,2), filters=256),
            self.Depthwise_Separable_Conv(strides=(1,1), filters=256),
            self.Depthwise_Separable_Conv(strides=(2,2), filters=512),

            # 512 block × 5
            *[self.Depthwise_Separable_Conv(strides=(1,1), filters=512) for _ in range(5)],

            self.Depthwise_Separable_Conv(strides=(2,2), filters=1024),
            self.Depthwise_Separable_Conv(strides=(1,1), filters=1024),

            layers.GlobalAveragePooling2D(),
            layers.Dropout(self.dropout),
            layers.Dense(self.num_classes, activation='softmax')
        ])

        return self.model


In [6]:
model = MobileNetV1()
model = model.build()
model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
model.summary()



In [7]:
EPOCHS = 3
model.fit(ds_train, epochs=EPOCHS)

Epoch 1/3
[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m113s[0m 92ms/step - accuracy: 0.3475 - loss: 1.7800
Epoch 2/3
[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m52s[0m 66ms/step - accuracy: 0.5879 - loss: 1.1649
Epoch 3/3
[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m49s[0m 63ms/step - accuracy: 0.6819 - loss: 0.9100


<keras.src.callbacks.history.History at 0x7c5706003260>

In [8]:
# Evaluation
test_loss, test_acc = model.evaluate(ds_test, verbose=2)

print(f"Test Accuracy: {test_acc * 100:.2f}%")
print(f"Test Loss: {test_loss:.4f}")

157/157 - 9s - 57ms/step - accuracy: 0.6276 - loss: 1.1212
Test Accuracy: 62.76%
Test Loss: 1.1212


In [9]:
import contextlib

saved_model_dir = "mobilenetv1_cifar10"

with open(os.devnull, "w") as f, contextlib.redirect_stdout(f):
    model.export(saved_model_dir)

In [10]:
import numpy as np

def representative_data_gen():
    for images, _ in ds_train.take(100):
        for image in images:
            yield [np.expand_dims(image.numpy(), axis=0)] #(H, W, C) -> (B, H, W, C)

In [11]:
# FP32 -> Int8 quantization setting
saved_model_dir = "mobilenetv1_cifar10"
converter = tf.lite.TFLiteConverter.from_saved_model(saved_model_dir)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_data_gen
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8

tflite_model = converter.convert()

tflite_model_file = "mobilenetv1_cifar10_int8.tflite"
with open(tflite_model_file, "wb") as f:
    f.write(tflite_model)

print(f"TFLite int8 model save {tflite_model_file}")

TFLite int8 model save mobilenetv1_cifar10_int8.tflite


In [12]:
# Quantized Model Evaluation

tflite_model_file = "mobilenetv1_cifar10_int8.tflite"
interpreter = tf.lite.Interpreter(model_path=tflite_model_file)
interpreter.allocate_tensors()

input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

input_scale, input_zero_point = input_details[0]['quantization']

correct = 0
total = 0

for images, labels in ds_test:
    for i in range(images.shape[0]):
        input_data = np.expand_dims(images[i].numpy(), axis=0)
        input_data = np.round(input_data / input_scale) + input_zero_point
        input_data = input_data.astype(np.int8)

        interpreter.set_tensor(input_details[0]['index'], input_data)
        interpreter.invoke()
        output_data = interpreter.get_tensor(output_details[0]['index'])
        pred_label = np.argmax(output_data)

        if pred_label == labels[i].numpy():
            correct += 1
        total += 1

print(f"{correct}/{total} correct, accuracy: {correct/total*100:.2f}%")

    TF 2.20. Please use the LiteRT interpreter from the ai_edge_litert package.
    See the [migration guide](https://ai.google.dev/edge/litert/migration)
    for details.
    


6252/10000 correct, accuracy: 62.52%
