In [1]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.applications import ResNet50V2
from tensorflow.keras.layers.experimental import preprocessing
from tensorflow.keras.preprocessing.image import load_img, img_to_array
import numpy as np
import random

In [7]:
def apply_style_transfer(content_image, style_image):
    # Load the VGG19 model
    vgg19 = VGG19(weights="imagenet", include_top=False)

    # Preprocess the content and style images
    content_array = preprocess_input(content_image)
    style_array = preprocess_input(style_image)

    # Extract content and style features from the VGG19 model
    content_features = vgg19.predict(np.expand_dims(content_array, axis=0))
    style_features = vgg19.predict(np.expand_dims(style_array, axis=0))

    # Compute the Gram matrix for the style features
    style_gram_matrices = [gram_matrix(feature) for feature in style_features]

    # Initialize the generated image as the content image
    generated_image = tf.Variable(content_array, dtype=tf.float32)

    # Define the optimizer
    optimizer = tf.optimizers.Adam(learning_rate=0.02, beta_1=0.99, epsilon=1e-1)

    # Define the style transfer loss
    style_weight = 1e-2
    content_weight = 1e4

    @tf.function()
    def style_transfer_step(image):
        # Compute the content loss
        content_loss = content_weight * content_loss_function(content_features, image)

        # Compute the style loss
        style_loss = style_weight * style_loss_function(style_gram_matrices, image)

        # Compute the total loss
        total_loss = content_loss + style_loss

        # Compute the gradients and apply them to the image
        gradients = tf.gradients(total_loss, image)[0]
        optimizer.apply_gradients([(gradients, image)])

        # Clip the pixel values to the range [0, 255]
        clipped_image = tf.clip_by_value(image, clip_value_min=0.0, clip_value_max=255.0)

        return clipped_image

    # Perform multiple style transfer steps
    num_steps = 10
    for _ in range(num_steps):
        generated_image = style_transfer_step(generated_image)

    # Convert the generated image to the original range [0, 255]
    generated_image = tf.cast(generated_image, tf.uint8)

    return generated_image.numpy()

# Function to compute the Gram matrix
def gram_matrix(feature_tensor):
    batch_size, height, width, channels = feature_tensor.shape
    features = tf.reshape(feature_tensor, (batch_size, height * width, channels))
    gram_matrix = tf.matmul(features, features, transpose_a=True)
    gram_matrix /= tf.cast(height * width * channels, tf.float32)
    return gram_matrix

# Function to compute the content loss
def content_loss_function(content_features, generated_image):
    return tf.reduce_mean(tf.square(content_features - generated_image))

# Function to compute the style loss
def style_loss_function(style_gram_matrices, generated_image):
    generated_image_gram_matrices = [gram_matrix(feature) for feature in generated_image]
    style_loss = 0.0
    for style_gram, generated_gram in zip(style_gram_matrices, generated_image_gram_matrices):
        style_loss += tf.reduce_mean(tf.square(style_gram - generated_gram))
    return style_loss

In [15]:
# Set random seed for reproducibility
random.seed(42)
np.random.seed(42)
tf.random.set_seed(42)

# Define constants
num_classes = 10
input_shape = (32, 32, 3)
learning_rate = 0.001
batch_size = 256
hidden_units = 512
projection_units = 128
num_epochs = 50
dropout_rate = 0.5
temperature = 0.05

# Load the CIFAR-10 dataset
(x_train, y_train), (x_test, y_test) = keras.datasets.cifar10.load_data()

# Normalize the input data
x_train = x_train.astype("float32") / 255.0
x_test = x_test.astype("float32") / 255.0

# Define the style image and load it
style_image_path = '../styles/a1.png'  # Replace with the path to your style image
style_image = keras.preprocessing.image.load_img(style_image_path, target_size=input_shape)
style_image_array = keras.preprocessing.image.img_to_array(style_image)
style_image_array = np.expand_dims(style_image_array, axis=0)

# Define the data augmentation pipeline
data_augmentation = keras.Sequential(
    [
        preprocessing.Rescaling(scale=1./255),
        preprocessing.RandomFlip("horizontal"),
        preprocessing.RandomRotation(0.02),
    ],
    name="data_augmentation",
)

# Define the encoder model
def create_encoder():
    resnet = ResNet50V2(
        include_top=False, weights=None, input_shape=input_shape, pooling=None
    )

    inputs = keras.Input(shape=input_shape)
    augmented = data_augmentation(inputs)
    outputs = resnet(augmented)
    outputs = layers.GlobalAveragePooling2D()(outputs)  # Global average pooling
    model = keras.Model(inputs=inputs, outputs=outputs, name="cifar10-encoder")
    return model
projection_units = 64
# Define the projection head model
def create_projection_head(encoder, embedding_dim=128, projection_units=128):
    input_shape = encoder.layers[0].input_shape[1:]  # Shape of the encoder input
    features = encoder.output  # Output of the encoder

    outputs = layers.Dense(projection_units)(features)
    outputs = layers.ReLU()(outputs)
    outputs = layers.Dense(embedding_dim)(outputs)
    outputs = layers.Lambda(lambda x: tf.math.l2_normalize(x, axis=-1))(outputs)

    model = keras.Model(
        inputs=encoder.input,
        outputs=outputs,
        name="cifar-encoder_with_projection-head"
    )
    return model





# Define the linear classifier model
def create_classifier(encoder, trainable=True):
    for layer in encoder.layers:
        layer.trainable = trainable

    inputs = keras.Input(shape=input_shape)
    features = encoder(inputs)
    features = layers.Dropout(dropout_rate)(features)
    features = layers.Dense(hidden_units, activation="relu")(features)
    features = layers.Dropout(dropout_rate)(features)
    outputs = layers.Dense(num_classes, activation="softmax")(features)

    model = keras.Model(inputs=inputs, outputs=outputs, name="cifar10-classifier")
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate),
        loss=keras.losses.SparseCategoricalCrossentropy(),
        metrics=[keras.metrics.SparseCategoricalAccuracy()],
    )
    return model

class ContrastiveLoss(keras.losses.Loss):
    def __init__(self, temperature=1, name=None):
        super().__init__(name=name)
        self.temperature = temperature

    def __call__(self, labels, feature_vectors, sample_weight=None):
        feature_vectors_normalized = tf.math.l2_normalize(feature_vectors, axis=1)
        similarity_matrix = tf.matmul(feature_vectors_normalized, feature_vectors_normalized, transpose_b=True)
        logits = similarity_matrix / self.temperature
        
        batch_size = tf.shape(labels)[0]
        labels = tf.reshape(labels, (batch_size, 1))
        labels = tf.cast(labels, tf.int32)

        positive_mask = tf.equal(labels, tf.transpose(labels))
        negative_mask = tf.logical_not(positive_mask)

        numerator = tf.exp(logits)
        denominator = tf.reduce_sum(tf.exp(logits), axis=1, keepdims=True)

        positive_prob = tf.boolean_mask(numerator, positive_mask)
        negative_prob = tf.boolean_mask(numerator, negative_mask)
        negative_prob = tf.reshape(negative_prob, (batch_size, -1))

        loss = -tf.math.log(positive_prob / (tf.reduce_sum(negative_prob, axis=1) + positive_prob))

        return tf.reduce_mean(loss)


# Create the encoder, projection head, and classifier models
encoder = create_encoder()
projection_head = create_projection_head(encoder)
classifier = create_classifier(encoder, trainable=False)

# Compile the models
projection_head.compile(
    optimizer=keras.optimizers.Adam(learning_rate),
    loss=ContrastiveLoss(temperature),
)
classifier.compile(
    optimizer=keras.optimizers.Adam(learning_rate),
    loss=keras.losses.SparseCategoricalCrossentropy(),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
)

# Perform contrastive learning
history = projection_head.fit(
    x=x_train, y=y_train, batch_size=batch_size, epochs=num_epochs
)

# Apply style transfer to training images
stylized_x_train = []
for image in x_train:
    stylized_image = apply_style_transfer(image, style_image_array)  # Implement your style transfer function
    stylized_x_train.append(stylized_image)
stylized_x_train = np.array(stylized_x_train)

history = classifier.fit(
    x=stylized_x_train,
    y=y_train,
    batch_size=batch_size,
    epochs=num_epochs,
    validation_data=(x_test, y_test),
)


# Evaluate the classifier on the test set
accuracy = classifier.evaluate(x_test, y_test)[1]
print(f"Test accuracy: {round(accuracy * 100, 2)}%")


Epoch 1/50


InvalidArgumentError: Graph execution error:

Detected at node 'Reshape_1' defined at (most recent call last):
    File "C:\Users\Hp\anaconda3\lib\runpy.py", line 197, in _run_module_as_main
      return _run_code(code, main_globals, None,
    File "C:\Users\Hp\anaconda3\lib\runpy.py", line 87, in _run_code
      exec(code, run_globals)
    File "C:\Users\Hp\anaconda3\lib\site-packages\ipykernel_launcher.py", line 16, in <module>
      app.launch_new_instance()
    File "C:\Users\Hp\anaconda3\lib\site-packages\traitlets\config\application.py", line 846, in launch_instance
      app.start()
    File "C:\Users\Hp\anaconda3\lib\site-packages\ipykernel\kernelapp.py", line 677, in start
      self.io_loop.start()
    File "C:\Users\Hp\anaconda3\lib\site-packages\tornado\platform\asyncio.py", line 199, in start
      self.asyncio_loop.run_forever()
    File "C:\Users\Hp\anaconda3\lib\asyncio\base_events.py", line 601, in run_forever
      self._run_once()
    File "C:\Users\Hp\anaconda3\lib\asyncio\base_events.py", line 1905, in _run_once
      handle._run()
    File "C:\Users\Hp\anaconda3\lib\asyncio\events.py", line 80, in _run
      self._context.run(self._callback, *self._args)
    File "C:\Users\Hp\anaconda3\lib\site-packages\ipykernel\kernelbase.py", line 471, in dispatch_queue
      await self.process_one()
    File "C:\Users\Hp\anaconda3\lib\site-packages\ipykernel\kernelbase.py", line 460, in process_one
      await dispatch(*args)
    File "C:\Users\Hp\anaconda3\lib\site-packages\ipykernel\kernelbase.py", line 367, in dispatch_shell
      await result
    File "C:\Users\Hp\anaconda3\lib\site-packages\ipykernel\kernelbase.py", line 662, in execute_request
      reply_content = await reply_content
    File "C:\Users\Hp\anaconda3\lib\site-packages\ipykernel\ipkernel.py", line 360, in do_execute
      res = shell.run_cell(code, store_history=store_history, silent=silent)
    File "C:\Users\Hp\anaconda3\lib\site-packages\ipykernel\zmqshell.py", line 532, in run_cell
      return super().run_cell(*args, **kwargs)
    File "C:\Users\Hp\anaconda3\lib\site-packages\IPython\core\interactiveshell.py", line 2863, in run_cell
      result = self._run_cell(
    File "C:\Users\Hp\anaconda3\lib\site-packages\IPython\core\interactiveshell.py", line 2909, in _run_cell
      return runner(coro)
    File "C:\Users\Hp\anaconda3\lib\site-packages\IPython\core\async_helpers.py", line 129, in _pseudo_sync_runner
      coro.send(None)
    File "C:\Users\Hp\anaconda3\lib\site-packages\IPython\core\interactiveshell.py", line 3106, in run_cell_async
      has_raised = await self.run_ast_nodes(code_ast.body, cell_name,
    File "C:\Users\Hp\anaconda3\lib\site-packages\IPython\core\interactiveshell.py", line 3309, in run_ast_nodes
      if await self.run_code(code, result, async_=asy):
    File "C:\Users\Hp\anaconda3\lib\site-packages\IPython\core\interactiveshell.py", line 3369, in run_code
      exec(code_obj, self.user_global_ns, self.user_ns)
    File "C:\Users\Hp\AppData\Local\Temp\ipykernel_14624\1847146305.py", line 140, in <cell line: 140>
      history = projection_head.fit(
    File "C:\Users\Hp\anaconda3\lib\site-packages\keras\utils\traceback_utils.py", line 65, in error_handler
      return fn(*args, **kwargs)
    File "C:\Users\Hp\anaconda3\lib\site-packages\keras\engine\training.py", line 1685, in fit
      tmp_logs = self.train_function(iterator)
    File "C:\Users\Hp\anaconda3\lib\site-packages\keras\engine\training.py", line 1284, in train_function
      return step_function(self, iterator)
    File "C:\Users\Hp\anaconda3\lib\site-packages\keras\engine\training.py", line 1268, in step_function
      outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "C:\Users\Hp\anaconda3\lib\site-packages\keras\engine\training.py", line 1249, in run_step
      outputs = model.train_step(data)
    File "C:\Users\Hp\anaconda3\lib\site-packages\keras\engine\training.py", line 1051, in train_step
      loss = self.compute_loss(x, y, y_pred, sample_weight)
    File "C:\Users\Hp\anaconda3\lib\site-packages\keras\engine\training.py", line 1109, in compute_loss
      return self.compiled_loss(
    File "C:\Users\Hp\anaconda3\lib\site-packages\keras\engine\compile_utils.py", line 265, in __call__
      loss_value = loss_obj(y_t, y_p, sample_weight=sw)
    File "C:\Users\Hp\AppData\Local\Temp\ipykernel_14624\1847146305.py", line 116, in __call__
      negative_prob = tf.reshape(negative_prob, (batch_size, -1))
Node: 'Reshape_1'
Input to reshape is a tensor with 58744 values, but the requested shape requires a multiple of 256
	 [[{{node Reshape_1}}]] [Op:__inference_train_function_100350]