# Imports

In [173]:
import os
import pandas as pd
import ast
import numpy as np
import tensorflow as tf
from tensorflow import keras
from keras import layers
from sklearn.decomposition import PCA

In [174]:
dir = os.getcwd()
df_features = pd.read_csv(dir + r'\data\df_features.csv')
df_labels = pd.read_csv(dir + r'\data\df_labels.csv')

# USER INPUT

In [220]:
input_ingredients = ['salt', 'beef stock', 'yellow onion', 'fingerling potatoes', 'paprika', 'boneless beef chuck roast']
input_source_category = 'classic' # possible values: vegan, vegetarian or classic
input_target_category = 'vegan' # possible values: vegan, vegetarian or classic

# Constants and hyperparameters

In [176]:
batch_size = 4
num_channels = 1
num_classes = 3

latent_dim = 128
feature_size = 128
# if one or both of these dimensionalities are changed, generator layers have to be adjusted

prob_contained = 0.5 # Threshold value of the probability that an ingredient is included in the fake recipe, from when it is taken into account

In [177]:
# labels 
label_encodings = {
    'vegan': 0,
    'vegetarian': 1,
    'classic': 2
}

# Preprocessing

## Calculating numbers

In [178]:
generator_in_channels = latent_dim + num_classes
discriminator_in_channels = num_channels + num_classes
print(generator_in_channels, discriminator_in_channels)

131 4


In [180]:
# preprare list of attributes for transforming values of fake recipes to real ingredient names
attributes = list(df_features.columns)

## User input 

In [181]:
# one hot encode real recipe into dataframe with same structure as training dataframe
real_recipe = []
for ingredient in attributes: 
    if ingredient in input_ingredients:
        real_recipe.append(1)
    else: 
        real_recipe.append(0)

In [182]:
# transform input labels to categorical 
for label in label_encodings: 
    if input_source_category == label:
        start_class = label_encodings[label]
    if input_target_category == label:
        end_class = label_encodings[label]

## PCA

In [183]:
# convert dataframes to numpy arrays
df_features = pd.DataFrame(df_features).to_numpy()
df_labels = df_labels.to_numpy()

# convert list of real recipe to numpy array
real_recipe = np.array(real_recipe).reshape(1,len(attributes))

In [184]:
df_features.shape

(1000, 1850)

In [185]:
# dimensionality reduction of training data 
# check if n_samples >= n_features (only then mle can be used)
if df_features.shape[0] >= df_features.shape[1]:
    pca = PCA(n_components = 'mle') # mle: automatic choice of target dimensionality
    # fit the model and perform dimensionality reduction
    df_features = pca.fit_transform(df_features)
    # adjust feature_size 
    feature_size = df_features.shape[1]
    
else: 
    pca = PCA(n_components = feature_size)
    # fit the model and perform dimensionality reduction
    df_features = pca.fit_transform(df_features)
    
# dimensionality reduction of real recipe
real_recipe = pca.transform(real_recipe)

In [186]:
# pca.get_covariance()

## Adjust formats

In [187]:
# merge tensors to one tensor and shuffle recipes
dataset = tf.data.Dataset.from_tensor_slices((df_features, df_labels))
dataset = dataset.shuffle(buffer_size = 1024).batch(batch_size)

# convert real_recipe to tensor
real_recipe = tf.cast(real_recipe, tf.float32)

# Conditional GAN model

## Discriminator

In [188]:
# Create the discriminator.
discriminator = keras.Sequential(
    [
        keras.layers.InputLayer((df_features.shape[1], discriminator_in_channels)),
        layers.Conv1D(df_features.shape[1]*2, 3, strides = 2, padding = 'same'),
        layers.LeakyReLU(alpha = 0.2),
        layers.Conv1D(df_features.shape[1]*4, 3, strides = 2, padding = 'same'),
        layers.LeakyReLU(alpha=0.2),
        layers.GlobalMaxPooling1D(),
        layers.Dense(1),
    ], 
    name = 'discriminator'
)

## Generator

In [189]:
# Create the generator.
generator = keras.Sequential(
    [
        keras.layers.InputLayer((generator_in_channels,)),
        # We want to generate 128 + num_classes coefficients to reshape into a
        # 7x7x(128 + num_classes) map.
        layers.Dense(7 * generator_in_channels),
        layers.LeakyReLU(alpha=0.2),
        layers.Reshape((7, generator_in_channels)),
        layers.Conv1DTranspose(128, 13, strides=2, padding="valid"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv1DTranspose(128, 12, strides=2, padding="valid"),
        layers.LeakyReLU(alpha=0.2),
        # layers.Conv1DTranspose(128, 11, strides=2, padding="valid"),
        # layers.LeakyReLU(alpha=0.2),
        # layers.Conv1DTranspose(128, 9, strides=2, padding="valid"),
        # layers.LeakyReLU(alpha=0.2),
        # layers.Conv1DTranspose(128, 12, strides=2, padding="valid"),
        # layers.LeakyReLU(alpha=0.2),
        # layers.Conv1DTranspose(128, 10, strides=2, padding="valid"),
        # layers.LeakyReLU(alpha=0.2),
        layers.Conv1DTranspose(128, 10, strides=2, padding="valid"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv1D(1, 1, padding="valid", activation="sigmoid"),
    ],
    name="generator",
)

## Whole CGAN

In [190]:
class ConditionalGAN(keras.Model):
    def __init__(self, discriminator, generator, latent_dim):
        super().__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim
        self.gen_loss_tracker = keras.metrics.Mean(name="generator_loss")
        self.disc_loss_tracker = keras.metrics.Mean(name="discriminator_loss")

    @property
    def metrics(self):
        return [self.gen_loss_tracker, self.disc_loss_tracker]

    def compile(self, d_optimizer, g_optimizer, loss_fn):
        super().compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.loss_fn = loss_fn

    def train_step(self, data):
        # Unpack the data.
        real_recipes, one_hot_labels = data
        real_recipes = tf.cast(real_recipes, tf.float32)
        one_hot_labels = tf.cast(one_hot_labels, tf.float32)
        real_recipes = tf.reshape(
            real_recipes, [-1, feature_size, 1]
        )
        print('needed dimension for generator output: ',  feature_size)
        
        # Add dummy dimensions to the labels so that they can be concatenated with
        # the recipes. This is for the discriminator.
        recipe_one_hot_labels = one_hot_labels[:, :, None]
        recipe_one_hot_labels = tf.repeat(
            recipe_one_hot_labels, repeats=[feature_size]
        )
        recipe_one_hot_labels = tf.reshape(
            recipe_one_hot_labels, (-1, feature_size, num_classes)
        )

        # Sample random points in the latent space and concatenate the labels.
        # This is for the generator.
        batch_size = tf.shape(real_recipes)[0]
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
        random_vector_labels = tf.concat(
            [random_latent_vectors, one_hot_labels], axis=1
        )
        
        # Decode the noise (guided by labels) to fake recipes.
        generated_recipes = self.generator(random_vector_labels)

        # Combine them with real recipes. Note that we are concatenating the labels
        # with these recipes here.
        print('current dimension at generator output: ', generated_recipes.shape[1])
        fake_recipe_and_labels = tf.concat([generated_recipes, recipe_one_hot_labels], -1)
        real_recipe_and_labels = tf.concat([real_recipes, recipe_one_hot_labels], -1)
        combined_recipes = tf.concat(
            [fake_recipe_and_labels, real_recipe_and_labels], axis=0
        )

        # Assemble labels discriminating real from fake recipes.
        labels = tf.concat(
            [tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
        )
        
        # Train the discriminator.
        with tf.GradientTape() as tape:
            predictions = self.discriminator(combined_recipes)
            d_loss = self.loss_fn(labels, predictions)
        grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
        self.d_optimizer.apply_gradients(
            zip(grads, self.discriminator.trainable_weights)
        )

        # Sample random points in the latent space.
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
        random_vector_labels = tf.concat(
            [random_latent_vectors, one_hot_labels], axis=1
        )

        # Assemble labels that say "all real recipes".
        misleading_labels = tf.zeros((batch_size, 1))

        # Train the generator (note that we should *not* update the weights
        # of the discriminator)!
        with tf.GradientTape() as tape:
            fake_recipes = self.generator(random_vector_labels)
            fake_recipe_and_labels = tf.concat([fake_recipes, recipe_one_hot_labels], -1)
            predictions = self.discriminator(fake_recipe_and_labels)
            g_loss = self.loss_fn(misleading_labels, predictions)
        grads = tape.gradient(g_loss, self.generator.trainable_weights)
        self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))

        # Monitor loss.
        self.gen_loss_tracker.update_state(g_loss)
        self.disc_loss_tracker.update_state(d_loss)
        return {
            "g_loss": self.gen_loss_tracker.result(),
            "d_loss": self.disc_loss_tracker.result(),
        }

## Training

In [191]:
cond_gan = ConditionalGAN(
    discriminator=discriminator, generator=generator, latent_dim=latent_dim
)
cond_gan.compile(
    d_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    g_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    loss_fn=keras.losses.BinaryCrossentropy(from_logits=True),
    # loss_fn=keras.losses.CategoricalCrossentropy(from_logits=True),
)

cond_gan.fit(dataset, epochs=20)

Epoch 1/20


needed dimension for generator output:  128
current dimension at generator output:  128
needed dimension for generator output:  128
current dimension at generator output:  128
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<keras.callbacks.History at 0x2a1be7e58e0>

# Transformation of recipe in other class


In [212]:
# extract the trained generator from Conditional GAN.
trained_gen = cond_gan.generator

# Choose the number of intermediate recipes that would be generated in
# between the interpolation + 2 (first recipe and last recipe).
num_interpolation = 9  # @param {type:"integer"}

# Sample noise for the interpolation.
interpolation_noise = tf.random.normal(shape=(1, latent_dim))
# interpolation_noise = real_recipe
interpolation_noise = tf.repeat(interpolation_noise, repeats=num_interpolation-1)
interpolation_noise = tf.reshape(interpolation_noise, (num_interpolation-1, latent_dim))
interpolation_noise = tf.concat([real_recipe, interpolation_noise], 0)

def interpolate_class(first_number, second_number):
    # Convert the start and end labels to one-hot encoded vectors.
    first_label = keras.utils.to_categorical([first_number], num_classes)
    second_label = keras.utils.to_categorical([second_number], num_classes)
    first_label = tf.cast(first_label, tf.float32)
    second_label = tf.cast(second_label, tf.float32)

    # Calculate the interpolation vector between the two labels.
    percent_second_label = tf.linspace(0, 1, num_interpolation)[:, None]
    percent_second_label = tf.cast(percent_second_label, tf.float32)
    interpolation_labels = (
        first_label * (1 - percent_second_label) + second_label * percent_second_label
    )

    # Combine the noise and the labels and run inference with the generator.
    noise_and_labels = tf.concat([interpolation_noise, interpolation_labels], 1)
    fake = trained_gen.predict(noise_and_labels)
    
    return fake

fake_recipes = interpolate_class(start_class, end_class)




In [213]:
test = tf.concat([real_recipe, real_recipe], 0)

In [214]:
fake_recipes.shape

(9, 128, 1)

In [215]:
# transform data back to original space (pca inverse) and put it into dataframe with original attributes
fake_recipes = fake_recipes.reshape(num_interpolation, latent_dim)
fake_recipes = pca.inverse_transform(fake_recipes)
fake_recipes = pd.DataFrame(fake_recipes, columns = attributes)

In [216]:
# extract all ingredients that are contained in fake recipe (approach: every ingredient contained with value >= 0.5)
recipes = []
for row in fake_recipes.values:
    recipe = []
    for column_num, value in enumerate(row):
        if value >= prob_contained:
            recipe.append(attributes[column_num])
    recipes.append(recipe)

# USER OUTPUT 

In [217]:
recipes

[[],
 ['tomatoes'],
 ['tomatoes'],
 [],
 ['tomatoes'],
 ['tomatoes'],
 ['salt and pepper'],
 ['tomatoes'],
 ['tomatoes']]

In [218]:
print('Your original recipe:', input_ingredients, '\nthat you have labelled', input_source_category, '\nis now transformed into a', input_target_category, 'recipe. \nThe following recipe is your transformed recipe:', recipes[-1])

Your original recipe: ['salt', 'beef stock', 'yellow onion', 'fingerling potatoes', 'paprika'] 
that you have labelled classic 
is now transformed into a vegan recipe. 
The following recipe is your transformed recipe: ['tomatoes']


In [219]:
recipes

[[],
 ['tomatoes'],
 ['tomatoes'],
 [],
 ['tomatoes'],
 ['tomatoes'],
 ['salt and pepper'],
 ['tomatoes'],
 ['tomatoes']]

# sources 

description of convoluation and transposed convolution layers: https://towardsdatascience.com/understand-transposed-convolutions-and-build-your-own-transposed-convolution-layer-from-scratch-4f5d97b2967

example code: https://keras.io/examples/generative/conditional_gan/ (changed from 2d use case to 1d use case)