<a href="https://colab.research.google.com/github/cocolian/cocolian-nlp/blob/master/animal.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import glob, os, shutil, pathlib
import tensorflow as tf
from tensorflow import keras
from keras import layers
import matplotlib.pyplot as plt
import numpy as np


class Animals:
    def move_files(self, src_dir, dest_dir, pattern):
        dest = pathlib.PurePath(dest_dir, pattern)
        os.makedirs(dest, exist_ok=True)
        for filename in glob.glob(src_dir + '/' + pattern + '*'):
            shutil.copy(filename, dest)
            # print(filename)

    def make_subset(self, subset, from_index, end_index):
        src = pathlib.Path("/Users/lixiongfeng5/projects/mnielsen/data/animals")
        base = pathlib.Path(src, subset)
        for cat in ("dog", "cat"):
            dest = pathlib.Path(base, cat)
            os.makedirs(dest, exist_ok=True)
            print("from {src}  to {dest}")
            for fname in [f"{cat}.{index}.jpg" for index in range(from_index, end_index)]:
                shutil.move(src=src / cat / fname, dst=dest)

    def prepare_images(self):
        src = "/Users/lixiongfeng5/projects/mnielsen/data/dogs-vs-cats/"
        dest = "/Users/lixiongfeng5/projects/mnielsen/data/animals"
        self.prepare_data(src, dest, "dog")
        self.prepare_data(src, dest, "cat")

    def prepare_data(self):
        data_dir = pathlib.Path("/Users/lixiongfeng5/projects/mnielsen/data/animals")

        self.train_dataset = tf.keras.utils.image_dataset_from_directory(
            data_dir / "train",
            labels="inferred",
            image_size=(180, 180),
            batch_size=32)
        print("train_dataset size : ", self.train_dataset.reduce(0, lambda x,_: x+1).numpy())
        self.train_dataset = self.train_dataset.take(50)
        print("token train_dataset size : ", self.train_dataset.reduce(0, lambda x,_: x+1).numpy())
        self.test_dataset = tf.keras.utils.image_dataset_from_directory(
            data_dir / "test",
            labels="inferred",
            image_size=(180, 180),
            batch_size=32)
        print("test_dataset size : ", self.test_dataset.reduce(0, lambda x,_: x+1).numpy())
        self.test_dataset = self.test_dataset.take(25)
        print("token test_dataset size : ", self.test_dataset.reduce(0, lambda x,_: x+1).numpy())
        self.validation_dataset = tf.keras.utils.image_dataset_from_directory(
            data_dir / "validation",
            labels="inferred",
            image_size=(180, 180),
            batch_size=32)
        print("validation_dataset size : ", self.test_dataset.reduce(0, lambda x,_: x+1).numpy())
        self.validation_dataset = self.validation_dataset.take(50)
        print("token validation_dataset size : ", self.test_dataset.reduce(0, lambda x,_: x+1).numpy())


    def build_model(self):

        inputs = keras.Input(shape=(180, 180, 3))  ## ←----模型输入应该是尺寸为180×180的RGB图像
        data_augmentation = keras.Sequential(
            [
                layers.RandomFlip("horizontal"),
                layers.RandomRotation(0.1),
                layers.RandomZoom(0.2),
            ]
        )
        x = data_augmentation(inputs)
        x = layers.Rescaling(1. / 255)(x)  ##←----将输入除以255，使其缩放至[0, 1]区间
        x = layers.Conv2D(filters=32, kernel_size=3, activation="relu")(x)
        x = layers.MaxPooling2D(pool_size=2)(x)
        x = layers.Conv2D(filters=64, kernel_size=3, activation="relu")(x)
        x = layers.MaxPooling2D(pool_size=2)(x)
        x = layers.Conv2D(filters=128, kernel_size=3, activation="relu")(x)
        x = layers.MaxPooling2D(pool_size=2)(x)
        x = layers.Conv2D(filters=256, kernel_size=3, activation="relu")(x)
        x = layers.MaxPooling2D(pool_size=2)(x)
        x = layers.Conv2D(filters=256, kernel_size=3, activation="relu")(x)
        x = layers.Flatten()(x)
        x = layers.Dropout(0.5)(x)
        outputs = layers.Dense(1, activation="sigmoid")(x)
        model = keras.Model(inputs=inputs, outputs=outputs)
        model.compile(loss="binary_crossentropy",
                      optimizer="rmsprop",
                      metrics=["accuracy"])
        return model




    def plot_result(self, history):
        accuracy = history.history["accuracy"]
        val_accuracy = history.history["val_accuracy"]
        loss = history.history["loss"]
        val_loss = history.history["val_loss"]
        epochs = range(1, len(accuracy) + 1)
        plt.plot(epochs, accuracy, "bo", label="Training accuracy")
        plt.plot(epochs, val_accuracy, "b", label="Validation accuracy")
        plt.title("Training and validation accuracy")
        plt.legend()
        plt.figure()
        plt.plot(epochs, loss, "bo", label="Training loss")
        plt.plot(epochs, val_loss, "b", label="Validation loss")
        plt.title("Training and validation loss")
        plt.legend()
        plt.show()

    def get_features_and_labels(self, conv_base, dataset):
        all_features = []
        all_labels = []
        for images, labels in dataset:
            preprocessed_images = keras.applications.vgg16.preprocess_input(images)
            features = conv_base.predict(preprocessed_images)
            all_features.append(features)
            all_labels.append(labels)
        return np.concatenate(all_features), np.concatenate(all_labels)

    def prepare_vgg_data(self):
        self.prepare_data()
        self.conv_base = keras.applications.vgg16.VGG16(
            weights="imagenet",
            include_top=False,
            input_shape=(180, 180, 3))
        self.train_features, self.train_labels = self.get_features_and_labels(self.conv_base, self.train_dataset)
        self.val_features, self.val_labels = self.get_features_and_labels(self.conv_base, self.validation_dataset)
        self.test_features, self.test_labels = self.get_features_and_labels(self.conv_base, self.test_dataset)



    def run_vgg16(self):

        self.prepare_vgg_data()
        inputs = keras.Input(shape=(5, 5, 512))
        x = layers.Flatten()(inputs)  ## ←----请注意，将特征传入Dense层之前，需要先经过Flatten层
        x = layers.Dense(256)(x)
        x = layers.Dropout(0.5)(x)

        outputs = layers.Dense(1, activation="sigmoid")(x)
        model = keras.Model(inputs, outputs)

        model.compile(loss="binary_crossentropy",
                      optimizer="rmsprop",
                      metrics=["accuracy"])
        callbacks = [
            keras.callbacks.ModelCheckpoint(
                filepath="feature_extraction.kr",
                save_best_only=True,
                monitor="val_loss")
        ]
        history = model.fit(
            self.train_features, self.train_labels,
            epochs=20,
            validation_data=(self.val_features, self.val_labels),
            callbacks=callbacks)
        self.plot_result(history)

    def run_vgg_ext_model(self):
        self.prepare_vgg_data()

        data_augmentation = keras.Sequential(
            [
                layers.RandomFlip("horizontal"),
                layers.RandomRotation(0.1),
                layers.RandomZoom(0.2),
            ]
        )

        inputs = keras.Input(shape=(180, 180, 3))
        x = data_augmentation(inputs)  ##←----使用数据增强
        x = keras.applications.vgg16.preprocess_input(x)  ##←----对输入值进行缩放
        x = self.conv_base(x)
        x = layers.Flatten()(x)
        x = layers.Dense(256)(x)
        x = layers.Dropout(0.5)(x)
        outputs = layers.Dense(1, activation="sigmoid")(x)
        model = keras.Model(inputs, outputs)
        model.compile(loss="binary_crossentropy",
                      optimizer="rmsprop",
                      metrics=["accuracy"])

        callbacks = [
            keras.callbacks.ModelCheckpoint(
                filepath="feature_extraction_with_data_augmentation.kr",
                save_best_only=True,
                monitor="val_loss")
        ]
        history = model.fit(
            self.train_dataset,
            epochs=50,
            validation_data=self.validation_dataset,
            callbacks=callbacks)

        self.plot_result(history)

    def run_model(self):
        self.prepare_data()
        model = self.build_model()
        model.summary()
        callbacks = [
            keras.callbacks.ModelCheckpoint(
                filepath="convnet_from_scratch_with_augmentation.log",
                save_best_only=True,
                monitor="val_loss")
        ]
        history = model.fit(
            self.train_dataset,
            epochs=30,
            validation_data=self.validation_dataset,
            callbacks=callbacks)
        self.plot_result(history)


if __name__ == "__main__":
    inst = Animals()
    inst.run_vgg_ext_model()

NotFoundError: ignored