In [1]:
import tensorflow as tf
import concurrent.futures
from keras import Sequential
from tensorflow.keras.layers import Dense, Reshape, Flatten, BatchNormalization, LeakyReLU, Conv2D, Conv2DTranspose
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt
import numpy as np
import os
from tqdm import tqdm
import PIL
import glob


In [2]:
# Загрузка и предобработка датасета CelebA
def load_image(img_path, img_size):
    img = PIL.Image.open(img_path)
    img = img.resize(img_size)
    return np.array(img)


def load_celeb_a(dataset_path, img_size=(64, 64), max_workers=8):
    img_paths = glob.glob(os.path.join(dataset_path, '*.jpg'))

    data = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(load_image, img_path, img_size) for img_path in img_paths]
        for future in tqdm(concurrent.futures.as_completed(futures), total=len(img_paths), desc="Загрузка датасета"):
            data.append(future.result())

    return np.array(data)

In [3]:
# Сохранение сгенерированных изображений
def save_images(generator_inner, epoch, latent_dim_inner, examples=10):
    noise = np.random.normal(0, 1, (examples, latent_dim_inner))
    gen_images = generator_inner.predict(noise)
    gen_images = 0.5 * gen_images + 0.5
    fig, axs = plt.subplots(1, examples, figsize=(15, 15))
    for i in range(examples):
        axs[i].imshow(gen_images[i])
        axs[i].axis('off')
    plt.savefig(f"gan_images_epoch_{epoch}.png")
    plt.close()


# Демонстрация результатов генерации
def display_generated_images(generator_inner, latent_dim_inner, examples=10):
    noise = np.random.normal(0, 1, (examples, latent_dim_inner))
    gen_images = generator_inner.predict(noise)
    gen_images = 0.5 * gen_images + 0.5  # Обратная нормализация изображений в диапазон [0, 1]
    fig, axs = plt.subplots(1, examples, figsize=(15, 15))
    for i in range(examples):
        axs[i].imshow(gen_images[i])
        axs[i].axis('off')
    plt.show()

SyntaxError: invalid syntax (3761565367.py, line 11)

In [None]:
# Построение генератора
def build_generator(latent_dim_inner):
    model = Sequential()
    model.add(Dense(8 * 8 * 256, use_bias=False, input_shape=(latent_dim_inner,)))
    model.add(BatchNormalization())
    model.add(LeakyReLU())
    model.add(Reshape((8, 8, 256)))
    model.add(Conv2DTranspose(128, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    model.add(BatchNormalization())
    model.add(LeakyReLU())
    model.add(Conv2DTranspose(64, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    model.add(BatchNormalization())
    model.add(LeakyReLU())
    model.add(Conv2DTranspose(32, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    model.add(BatchNormalization())
    model.add(LeakyReLU())
    model.add(Conv2DTranspose(3, (5, 5), strides=(2, 2), padding='same', use_bias=False, activation='tanh'))
    return model


# Построение дискриминатора
def build_discriminator(image_shape):
    model = Sequential()
    model.add(Conv2D(64, (5, 5), strides=(2, 2), padding='same', input_shape=image_shape))
    model.add(LeakyReLU())
    model.add(Conv2D(128, (5, 5), strides=(2, 2), padding='same'))
    model.add(LeakyReLU())
    model.add(Conv2D(256, (5, 5), strides=(2, 2), padding='same'))
    model.add(LeakyReLU())
    model.add(Conv2D(512, (5, 5), strides=(2, 2), padding='same'))
    model.add(LeakyReLU())
    model.add(Flatten())
    model.add(Dense(1, activation='sigmoid'))
    return model

  # Построение и компиляция GAN
def build_gan(generator_inner, discriminator_inner):
    discriminator_inner.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=0.0002, beta_1=0.5),
                                metrics=['accuracy'])
    discriminator_inner.trainable = False
    gan_input = tf.keras.Input(shape=(latent_dim,))
    gan_output = discriminator_inner(generator_inner(gan_input))
    gan_inner = tf.keras.Model(gan_input, gan_output)
    gan_inner.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=0.0002, beta_1=0.5))
    return gan_inner

In [None]:
# Обучение GAN
def train_gan(gan_inner, generator_inner, discriminator_inner, data_inner, epochs_inner, batch_size_inner,
              latent_dim_inner, save_interval_inner):
    real = np.ones((batch_size_inner, 1))
    fake = np.zeros((batch_size_inner, 1))
    d_loss = -1
    g_loss = -1
    for epoch in tqdm(range(epochs_inner), desc="Обучение GAN"):
        for _ in range(len(data_inner) // batch_size_inner):
            idx = np.random.randint(0, data_inner.shape[0], batch_size_inner)
            images = data_inner[idx]
            noise = np.random.normal(0, 1, (batch_size_inner, latent_dim_inner))
            gen_images = generator_inner.predict(noise)
            d_loss_real = discriminator_inner.train_on_batch(images, real)
            d_loss_fake = discriminator_inner.train_on_batch(gen_images, fake)
            d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)
            noise = np.random.normal(0, 1, (batch_size_inner, latent_dim_inner))
            g_loss = gan_inner.train_on_batch(noise, real)
        if epoch % save_interval_inner == 0:
            save_images(generator_inner, epoch, latent_dim_inner)
            print(f"{epoch} [D loss: {d_loss[0]} | D accuracy: {100 * d_loss[1]}] [G loss: {g_loss}]")


In [None]:
# Основные параметры

latent_dim = 100
img_shape = (32, 32, 3)
epochs = 1
batch_size = 32
save_interval = 1

In [None]:
# Загрузка и нормализация данных
dataset_path = 'img_align_celeba'  # Укажите путь к датасету CelebA
print("Начинаю загрузку датасета")
data = load_celeb_a(dataset_path)
print("Датасет загружен")

In [None]:
# Функция для нормализации данных
def normalize_data(data_chunk):
    return (data_chunk - 127.5) / 127.5

# Размер пакета
batch_size = 10000
normalized_data = []

for i in tqdm(range(0, len(data), batch_size), desc="Нормализацяи датасета"):
    batch = data[i:i + batch_size].astype(np.float32)
    normalized_batch = normalize_data(batch)
    normalized_data.append(normalized_batch)

# Объединение всех нормализованных пакетов
data = np.concatenate(normalized_data, axis=0)
print("Датасет нормализован")

In [None]:
# Создание моделей
print("Создаю модель")
generator = build_generator(latent_dim)
discriminator = build_discriminator(img_shape)
gan = build_gan(generator, discriminator)

In [None]:
# Обучение модели
print("Начинаю обучение модели")
train_gan(gan, generator, discriminator, data, epochs, batch_size, latent_dim, save_interval)

display_generated_images(generator, latent_dim)