In [232]:
from sklearn.model_selection import train_test_split
import math
import pandas as pd
import tensorflow as tf
from tensorflow.keras.utils import Sequence
import os
import numpy as np

In [233]:
def test_model(data, epochs=1):
    class_names = 525

    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(224, 224, 3)),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(class_names)
    ])

    model.compile(optimizer='adam', loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                  metrics=['accuracy'])

    model.fit(data, epochs=epochs)

Аналог tf.keras.utils.image_dataset_from_directory. Реализация должна поддерживать следующие аргументы: directory, batch_size, image_size, shuffle, seed, validation_split, subset; и возвращать объект класса tf.data.Dataset. Элементом датасета является пара батч изображений и их классов. 

In [234]:
def load_labels(directory):
    image_paths, labels = [], []
    class_names = [d for d in os.listdir(directory) if os.path.isdir(os.path.join(directory, d))]
    for label in class_names:
        class_dir = os.path.join(directory, label)
        for img_file in os.listdir(class_dir):
            if img_file.lower().endswith(('.png', '.jpg', '.jpeg')):
                image_paths.append(os.path.join(class_dir, img_file))
                labels.append(label)
    return image_paths, labels, class_names


In [ ]:
def preprocess_image(file_path, image_size):
    image = tf.io.read_file(file_path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, image_size)
    return image

In [250]:
augmentation = tf.keras.Sequential([
    tf.keras.layers.RandomFlip("horizontal"),
    tf.keras.layers.RandomContrast(factor=0.25),
    tf.keras.layers.RandomBrightness(factor=0.3),
    tf.keras.layers.RandomRotation(factor=0.15),
    tf.keras.layers.GaussianNoise(stddev=0.1),
    tf.keras.layers.Rescaling(scale=1. / 255)
])

def preprocess_image_aug(file_path, image_size):
    image = tf.io.read_file(file_path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, image_size)
    image = augmentation(image)
    return image

In [251]:
def image_dataset_from_directory(directory, batch_size=32, image_size=(256, 256), shuffle=True, seed=None,
                                 validation_split=None, subset=None, preprocessing_fun=preprocess_image):
    image_paths, labels, class_names = load_labels(directory)
    class_indices = {class_name: i for i, class_name in enumerate(class_names)}
    labels = [class_indices[label] for label in labels]
    image_paths = np.array(image_paths)
    labels = np.array(labels)

    if validation_split:
        train_paths, val_paths, train_labels, val_labels = train_test_split(
            image_paths, labels, test_size=validation_split, random_state=seed, shuffle = shuffle)

        if subset == 'training':
            dataset = tf.data.Dataset.from_tensor_slices((train_paths, train_labels))
        elif subset == 'validation':
            dataset = tf.data.Dataset.from_tensor_slices((val_paths, val_labels))
        elif subset == 'both':
            train_dataset = tf.data.Dataset.from_tensor_slices((train_paths, train_labels))
            val_dataset = tf.data.Dataset.from_tensor_slices((val_paths, val_labels))
            train_dataset = train_dataset.map(lambda x, y: (preprocessing_fun(x, image_size), y))
            val_dataset = val_dataset.map(lambda x, y: (preprocessing_fun(x, image_size), y))

            if shuffle:
                train_dataset = train_dataset.shuffle(buffer_size=1000, seed=seed)

            train_dataset_batched = train_dataset.batch(batch_size)
            val_dataset_batched = val_dataset.batch(batch_size)
            return train_dataset_batched, val_dataset_batched
        else:
            raise ValueError("Value should be 'training', 'validation' or 'both")
        dataset = dataset.map(lambda x, y: (preprocessing_fun(x, image_size), y))
    else:
        dataset = tf.data.Dataset.from_tensor_slices((image_paths, labels))
        dataset = dataset.map(lambda x, y: (preprocessing_fun(x, image_size), y))

    if shuffle:
        dataset = dataset.shuffle(buffer_size=1000, seed=seed)

    dataset_batched = dataset.batch(batch_size)
    return dataset_batched

In [252]:
directory = '/Users/merkost/Downloads/archive/test'
batch_size = 32
image_size = (224, 224)
shuffle = True
seed = 42
validation_split = 0.2
subset = 'validation'

train_dataset = image_dataset_from_directory(directory, batch_size=batch_size, image_size=image_size, shuffle=shuffle,
                                             seed=seed, validation_split=validation_split, subset=subset, preprocessing_fun=preprocess_image_aug)
test_model(train_dataset)



Функцию

def load_dataset

In [238]:
def load_dataset(path, batch_size, image_size, shuffle, split, preprocessing_fun=preprocess_image):
    """Given a `path` to a csv index file loads one of the dataset splits. Paths in the index are assumed to be relative to the csv file. The file contains three columns: "filepaths", "labels" and "data set", path to the image, image label and dataset split respectively.

    Arguments:
        path: path to the csv index file
        batch_size: size of batches in the dataset
        image_size: size to resize the images to
        shuffle: whether to shuffle the index. If False original index order is preserved
        split: split to use. One of "train", "valid" or "test"

    Returns:
        The loaded dataset
        A dictionary mapping class indices to class names"""
    df = pd.read_csv(path)
    df_split = df[df['data set'] == split].copy()

    class_names = df_split['labels'].unique()
    class_dict = {name: i for i, name in enumerate(class_names)}
    df_split['labels'] = df_split['labels'].map(class_dict)
    base_dir = os.path.dirname(path)
    df_split['filepaths'] = df_split['filepaths'].apply(lambda x: os.path.join(base_dir, x))
    paths = df_split['filepaths'].values
    labels = df_split['labels'].values
    dataset = tf.data.Dataset.from_tensor_slices((paths, labels))
    dataset = dataset.map(lambda x, y: (preprocessing_fun(x, image_size), y))

    if shuffle:
        dataset = dataset.shuffle(buffer_size=len(paths))

    dataset = dataset.batch(batch_size)
    return dataset, class_dict

In [239]:
path = '/Users/merkost/Downloads/archive/birds.csv'
ds, clss = load_dataset(path, batch_size, image_size, False, split='test')

test_model(ds)



In [240]:
class LoadDatasetSequence(Sequence):
    def __init__(self, path, batch_size, image_size, shuffle, split, preprocessing_fun=preprocess_image):
        self.batch_size = batch_size
        self.image_size = image_size
        self.shuffle = shuffle
        self.preprocessing_fun = preprocessing_fun
        self.df = pd.read_csv(path)
        print(self.df.head())
        self.df = self.df[self.df['data set'] == split]
        self.class_names = self.df['labels'].unique()
        self.class_dict = {name: i for i, name in enumerate(self.class_names)}
        self.df['labels'] = self.df['labels'].map(self.class_dict)
        base_dir = os.path.dirname(path)
        self.df['filepaths'] = self.df['filepaths'].apply(lambda x: os.path.join(base_dir, x))
        if self.shuffle:
            self.df = self.df.sample(frac=1, ignore_index=True)

    def __len__(self):
        return math.ceil(len(self.df) / self.batch_size)

    def __getitem__(self, idx):
        batch_df = self.df.iloc[idx * self.batch_size:(idx + 1) * self.batch_size]
        labels = batch_df['labels'].to_numpy()
        images = np.array([self.preprocessing_fun(path, self.image_size) for path in batch_df['filepaths']])
        return images, labels

    def on_epoch_end(self):
        if self.shuffle:
            self.df = self.df.sample(frac=1, ignore_index=True)

In [241]:
path_to_csv = '/Users/merkost/Downloads/archive/birds.csv'

load_seq = LoadDatasetSequence(path_to_csv, batch_size, image_size, shuffle, split='test')

test_model(load_seq)

   class id                      filepaths           labels data set  \
0       0.0  train/ABBOTTS BABBLER/001.jpg  ABBOTTS BABBLER    train   
1       0.0  train/ABBOTTS BABBLER/007.jpg  ABBOTTS BABBLER    train   
2       0.0  train/ABBOTTS BABBLER/008.jpg  ABBOTTS BABBLER    train   
3       0.0  train/ABBOTTS BABBLER/009.jpg  ABBOTTS BABBLER    train   
4       0.0  train/ABBOTTS BABBLER/002.jpg  ABBOTTS BABBLER    train   

        scientific name  
0  MALACOCINCLA ABBOTTI  
1  MALACOCINCLA ABBOTTI  
2  MALACOCINCLA ABBOTTI  
3  MALACOCINCLA ABBOTTI  
4  MALACOCINCLA ABBOTTI  


In [242]:
class ImageDataGenerator(Sequence):
    def __init__(self, directory, batch_size=32, image_size=(224,224), shuffle=True, seed=None, validation_split=None, subset=None, preprocessing_fun=preprocess_image):
        self.image_paths, self.labels, self.class_names = load_labels(directory)
        self.image_paths = np.array(self.image_paths)
        self.class_indices = {class_name: i for i, class_name in enumerate(self.class_names)}
        self.labels = [self.class_indices[label] for label in self.labels]
        self.labels = np.array(self.labels)
        self.batch_size = batch_size
        self.image_size = image_size
        self.shuffle = shuffle
        self.seed = seed
        self.preprocessing_fun = preprocessing_fun
        self.total_count = len(self.image_paths)
        print('Total images: ', self.total_count)

        if validation_split:
            self.train_paths, self.val_paths, self.train_labels, self.val_labels = train_test_split(
                self.image_paths, self.labels, test_size=validation_split, random_state=seed)

            if subset == 'training':
                self.image_paths = self.train_paths
                self.labels = self.train_labels
            elif subset == 'validation':
                self.image_paths = self.val_paths
                self.labels = self.val_labels
            else:
                # self.image_paths = self.train_paths + self.val_paths
                # self.labels = self.train_labels + self.val_labels
                # if shuffle:
                #     shuffle(self)
                pass
            
        if self.shuffle:
            self.__shuffle__()

    def __len__(self):
        return math.ceil(len(self.image_paths) / self.batch_size)

    def __getitem__(self, idx):
        start = idx * self.batch_size
        end = (idx + 1) * self.batch_size
        paths = self.image_paths[start:end]
        labels = self.labels[start:end]
        images = np.array([self.preprocessing_fun(path, self.image_size) for path in paths])
        return images, labels

    def on_epoch_end(self):
        if self.shuffle:
            self.__shuffle__()
            
    def __shuffle__(self):
        indices = np.arange(len(self.image_paths))
        np.random.shuffle(indices)
        self.image_paths = self.image_paths[indices]
        self.labels = self.labels[indices]


In [243]:
from_dir_seq = ImageDataGenerator(directory, subset='train')
test_model(from_dir_seq)

Total images:  2625
