# Lofi Gan V2

## About

This is similar to the original version, but the main difference is that this will "continue" a song. The inputs to the generator will be 5 second song snippets, and the generator's goal is to come up with an appropriate 5 second continuation to that snippet. The discriminator will be fed those snippets as fakes and the 10 second snippet (the 5 second snippet fed to the generator and the 5 second snippet that actually came after it in the song) as truths. The generator may not be able to work with random noise (uses LSTM as first layer, and the random noise probably won't work), but should work perfectly fine with any 5 second LOFI snippet.

## Setup

In [1]:
import dotenv
import os

dotenv.load_dotenv(dotenv_path="./config.env")
FILE = os.getenv("FILE")
BATCH_SIZE = int(os.getenv("BATCH_SIZE"))
START_POS = int(os.getenv("START_POS"))
EPOCH_SIZE = int(os.getenv("EPOCH_SIZE"))


In [2]:
FILE, BATCH_SIZE, START_POS, EPOCH_SIZE


('lofi-part1.wav', 100, 0, 300)

In [3]:
import scipy.io.wavfile as wavfile
import tensorflow as tf
import numpy as np
import keras
import keras.layers as layers
import keras.models as models
import keras.losses as losses
import keras.optimizers as optimizers
import keras.regularizers as reg

import os
import pickle
import random

import time
from IPython import display


In [4]:
DEVICE = "GPU"

physical_devices = tf.config.list_physical_devices(DEVICE)
print(physical_devices)

print(tf.config.list_physical_devices())


[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]
[PhysicalDevice(name='/physical_device:CPU:0', device_type='CPU'), PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


## Load Data

Here, we load data stored on the wave file. We then process it into float32 mode, meaning that the data will be floats from [-1, 1] instead of integers [-32768, 32767].

In [5]:
DATA_PREFIX = "D:/LofiData/"
file = DATA_PREFIX + FILE
sample_rate, data = wavfile.read(file)


In [6]:
total_seconds = len(data) / sample_rate
minutes = total_seconds // 60
seconds = total_seconds % 60
print(
    f"wave file of length {minutes} minutes and {seconds} seconds, which is {total_seconds} total seconds at sample rate {sample_rate}"
)


wave file of length 105.0 minutes and 44.35983333333297 seconds, which is 6344.359833333333 total seconds at sample rate 48000


In [7]:
# because we are using signed 16 bit PCM (ints), let's normalize our data to be between [-1, 1)
float_data = np.array(data, dtype=np.float16)
print(
    f"max: {np.max(float_data)}, min: {np.min(float_data)}"
)  # currently, still following int patterns of [-32768, +32767]
float_data /= 32768
print(
    f"max: {np.max(float_data)}, min: {np.min(float_data)}"
)  # now it falls between [-1, 1)


max: 32768.0, min: -32768.0
max: 1.0, min: -1.0


## Prepare Dataset

Now that our data is in float32 mode, we move on to the next step. We turn the song data into 5 second song pairs for our dataset. 

In [8]:
SHUFFLE_SIZE = 200
SONG_LEN_IN_SECONDS = 5


In [9]:
songs = []

# simulate diversity in dataset by starting the song sections at different times
# since that basically makes it a "new" observation
i = random.randint(0, SONG_LEN_IN_SECONDS - 1)

# add all songs to our list
while i < len(float_data) - SONG_LEN_IN_SECONDS * sample_rate:
    songs.append(float_data[i : i + SONG_LEN_IN_SECONDS * sample_rate])
    i += SONG_LEN_IN_SECONDS * sample_rate

# convert to numpy array and check the shape - should be (xxx, 720000)
songs = np.array(songs)
songs.shape


(1268, 240000)

In [10]:
# turn into training pairs
pairs = []
for i in range(EPOCH_SIZE):
    pairs.append(np.array([songs[i*2], songs[i*2+1]]))

pairs = np.array(pairs)
pairs.shape


(300, 2, 240000)

In [11]:
# check to make sure that it is still lofi and that they are pairs
song_to_play = random.choice(pairs)
display.Audio(list(song_to_play[0]) + list(song_to_play[1]), rate=sample_rate)


In [12]:
# check to make sure that tensor pairs can be extracted
batched_tensor = tf.constant(pairs)
print(batched_tensor.shape)

# see if we can extract just the first of each pair
batched_tensor[:, 0].shape

(300, 2, 240000)


<tf.Tensor: shape=(300, 240000), dtype=float16, numpy=
array([[ 0.08264 ,  0.0786  ,  0.0762  , ..., -0.1125  , -0.1094  ,
        -0.1074  ],
       [-0.03915 , -0.04025 , -0.04218 , ..., -0.0473  , -0.06223 ,
        -0.0761  ],
       [-0.0862  , -0.0853  , -0.0839  , ..., -0.2002  , -0.2013  ,
        -0.2014  ],
       ...,
       [ 0.0516  ,  0.0466  ,  0.0423  , ...,  0.01453 ,  0.01608 ,
         0.01746 ],
       [-0.1133  , -0.10925 , -0.1055  , ..., -0.2109  , -0.2098  ,
        -0.2087  ],
       [-0.007904, -0.00827 , -0.00894 , ...,  0.00836 ,  0.00818 ,
         0.00833 ]], dtype=float16)>

In [13]:
# make the dataset that the model will be trained on
dataset = (
    tf.data.Dataset.from_tensor_slices(tf.constant(pairs, dtype=tf.float32))
    .shuffle(SHUFFLE_SIZE)
    .batch(BATCH_SIZE)
)

# make sure that the shape is still (None, 2, 240000)
dataset


<BatchDataset element_spec=TensorSpec(shape=(None, 2, 240000), dtype=tf.float32, name=None)>

## Make Model

Here, we define our discriminator and generator models. The discriminator takes in two 5-second-long song snippets, turns them into one input, passes it through an LSTM, and then through hidden layers. The generator should also use an LSTM for processing the incoming 5-second-long song snippets, but it should then turn the LSTM outputs into a 5-second-long song. This means using some method of upscaling. In my case, I'm using Conv1DTransposes since making an LSTM or a Dense with as many nodes as there are samples in 5 seconds is entirely unfeasible for my GPU. 

In [14]:
# the shape of the noise to be transformed into LOFI
GENERATOR_INPUT_SIZE = (sample_rate * SONG_LEN_IN_SECONDS,)

# fraction of a second that the LSTM should take as input
# ex: MULTIPLIER=2 means that the LSTM listens to sections 1/2 second long
MULTIPLIER = 8


In [15]:
def make_discriminator():
    model = models.Sequential()
    model.add(layers.Input((2, SONG_LEN_IN_SECONDS * sample_rate)))
    model.add(
        layers.Reshape(
            (2 * SONG_LEN_IN_SECONDS * MULTIPLIER, sample_rate // MULTIPLIER)
        )
    )

    model.add(layers.LSTM(512))

    model.add(layers.Dense(256))
    model.add(layers.Dropout(0.3))
    model.add(layers.ReLU())

    model.add(layers.Dense(128))
    model.add(layers.Dropout(0.2))
    model.add(layers.ReLU())

    model.add(layers.Dense(1))
    return model


In [16]:
discriminator = make_discriminator()


In [17]:
discriminator.summary()


Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 reshape (Reshape)           (None, 80, 6000)          0         
                                                                 
 lstm (LSTM)                 (None, 512)               13338624  
                                                                 
 dense (Dense)               (None, 256)               131328    
                                                                 
 dropout (Dropout)           (None, 256)               0         
                                                                 
 re_lu (ReLU)                (None, 256)               0         
                                                                 
 dense_1 (Dense)             (None, 128)               32896     
                                                                 
 dropout_1 (Dropout)         (None, 128)               0

In [18]:
discriminator(
    tf.random.normal((1, 2, SONG_LEN_IN_SECONDS * sample_rate)), training=False
)


<tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[-0.14422037]], dtype=float32)>

In [19]:
def make_generator():
    model = models.Sequential()
    model.add(layers.Input(GENERATOR_INPUT_SIZE))
    model.add(
        layers.Reshape((SONG_LEN_IN_SECONDS * MULTIPLIER, sample_rate // MULTIPLIER))
    )

    model.add(layers.LSTM(512, dropout=0.15, kernel_regularizer='l2'))

    print(model.output_shape)
    model.add(layers.Reshape((1, 512)))

    model.add(layers.Conv1DTranspose(512, 2, strides=2, padding="same", kernel_regularizer='l2'))
    model.add(layers.Dropout(0.3))
    model.add(layers.ELU())
    print(model.output_shape)

    model.add(layers.Conv1DTranspose(512, 2, strides=2, padding="same", kernel_regularizer='l2'))
    model.add(layers.Dropout(0.3))
    model.add(layers.ELU())
    print(model.output_shape)

    model.add(layers.Conv1DTranspose(512, 5, strides=5, padding="same", kernel_regularizer='l2'))
    model.add(layers.Dropout(0.3))
    model.add(layers.ELU())
    print(model.output_shape)

    model.add(layers.Conv1DTranspose(512, 5, strides=5, padding="same", kernel_regularizer='l2'))
    model.add(layers.Dropout(0.3))
    model.add(layers.ELU())
    print(model.output_shape)

    model.add(layers.BatchNormalization())

    model.add(layers.Conv1DTranspose(480, 8, strides=5, padding="same", kernel_regularizer='l2', activation='tanh'))
    print(model.output_shape)

    model.add(layers.Flatten())
    print(model.output_shape)
    return model


In [20]:
generator = make_generator()


(None, 512)
(None, 2, 512)
(None, 4, 512)
(None, 20, 512)
(None, 100, 512)
(None, 500, 480)
(None, 240000)


In [21]:
generator.summary()


Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 reshape_1 (Reshape)         (None, 40, 6000)          0         
                                                                 
 lstm_1 (LSTM)               (None, 512)               13338624  
                                                                 
 reshape_2 (Reshape)         (None, 1, 512)            0         
                                                                 
 conv1d_transpose (Conv1DTra  (None, 2, 512)           524800    
 nspose)                                                         
                                                                 
 dropout_2 (Dropout)         (None, 2, 512)            0         
                                                                 
 elu (ELU)                   (None, 2, 512)            0         
                                                      

In [22]:
# check to make sure that we generate valid audio
output = generator(tf.random.normal((1, *GENERATOR_INPUT_SIZE)), training=False).numpy()
output = np.array(output, dtype=np.float32)
assert np.max(output) <= 1 and np.min(output) >= -1
display.Audio(output, rate=sample_rate)


## Loss and optimizers

These are standard GAN losses and practices.

In [23]:
with tf.device(DEVICE):
    loss = losses.BinaryCrossentropy(from_logits=True)
    generator_optimizer = optimizers.Adam(2e-3)
    discriminator_optimizer = optimizers.Adam(2e-3)


In [24]:
# loss functions
@tf.function
def generator_loss(fake_pred):
    # the generator's goal is to have all of its outputs
    # be classified as "real" by the discriminator
    return loss(tf.ones_like(fake_pred), fake_pred)


@tf.function
def discriminator_loss(true_pred, fake_pred):
    # the discriminator's goal is to have all of the real
    # inputs be classified as real and all the generated
    # inputs be classified as fake
    true_loss = loss(tf.ones_like(true_pred), true_pred)
    fake_loss = loss(tf.zeros_like(fake_pred), fake_pred)
    return true_loss + fake_loss


@tf.function
def simple_discriminator_loss(fake_pred):
    return loss(tf.zeros_like(fake_pred), fake_pred)


## Training Loop

In [25]:
SAMPLES_TO_GENERATE = 1


def generate_and_save_audio(gen, epoch_num, display_output=True):
    song_input = []
    for i in range(SAMPLES_TO_GENERATE):
        song_input.append(random.choice(songs))

    song_input = np.array(song_input)
    print(song_input.shape)

    inferences = generator(song_input, training=False)
    inferences = np.array(inferences.numpy(), dtype=np.float32)
    for i in range(len(inferences)):
        wavfile.write(
            f"./generatedAudiov2/epoch_{epoch_num}_v{i}.wav", sample_rate, inferences[i]
        )
        if display_output:
            display.display(
                display.Audio(f"./generatedAudiov2/epoch_{epoch_num}_v{i}.wav")
            )


In [27]:
#@tf.function  # with @tf.function performed slower than without. Maybe it's the tf.stack? Could also be the indexing
def train_generator_on_audio(audio):
    first_samples = audio[:, 0]
    second_samples = audio[:, 1]

    with tf.GradientTape() as gen_tape:
        generated_audio_1 = generator(first_samples, training=True)
        generated_audio_2 = generator(second_samples, training=True)

        fake_output_1 = discriminator(tf.stack([first_samples, generated_audio_1], axis=1), training=True)
        fake_output_2 = discriminator(tf.stack([second_samples, generated_audio_2], axis=1), training=True)

        gen_loss = generator_loss(fake_output_1) + generator_loss(fake_output_2)

    gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
    generator_optimizer.apply_gradients(
        zip(gradients_of_generator, generator.trainable_variables)
    )

#@tf.function  # with @tf.function performed slower than without. Maybe it's the tf.stack? Could also be the indexing
def train_step_on_audio(audio):
    # take the first song in each pair in the batch
    first_samples = audio[:, 0]

    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        # generator
        generated_audio = generator(first_samples, training=True)

        # train discriminator
        true_output = discriminator(audio, training=True)
        fake_output = discriminator(tf.stack([first_samples, generated_audio], axis=1), training=True)

        gen_loss = generator_loss(fake_output)
        disc_loss = discriminator_loss(true_output, fake_output)

    gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(
        disc_loss, discriminator.trainable_variables
    )

    generator_optimizer.apply_gradients(
        zip(gradients_of_generator, generator.trainable_variables)
    )
    discriminator_optimizer.apply_gradients(
        zip(gradients_of_discriminator, discriminator.trainable_variables)
    )

    return gen_loss, disc_loss

# not used in training, but idea is there
def train_step_on_noise():
    noise = tf.random.normal((BATCH_SIZE, *GENERATOR_INPUT_SIZE))

    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        generated_audio_1 = generator(noise, training=True)
        generated_audio_2 = generator(generated_audio_1, training=True)

        fake_output = discriminator(tf.stack([generated_audio_1, generated_audio_2], axis=1), training=True)

        gen_loss = generator_loss(fake_output)
        disc_loss = simple_discriminator_loss(fake_output)

    gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(
        disc_loss, discriminator.trainable_variables
    )

    generator_optimizer.apply_gradients(
        zip(gradients_of_generator, generator.trainable_variables)
    )
    discriminator_optimizer.apply_gradients(
        zip(gradients_of_discriminator, discriminator.trainable_variables)
    )

    return gen_loss, disc_loss

In [28]:
SOLO_GEN_TIMES = 3
epoch_name = "epoch_v3.b"
category_name = "audio"
training_name = "regularized_loss_audio"

def train(
    dataset,
    epochs: int,
    save_model_every: int = 3,
    save_output_every: int = 1,
    display_output: bool = True,
    tf_logdir: str = "",
):
    # tensorboard logs
    summary_writer = None
    if tf_logdir != "":
        summary_writer = tf.summary.create_file_writer(tf_logdir)

    # get epoch
    if os.path.exists(epoch_name):
        with open(epoch_name, "rb") as file:
            epoch = pickle.load(file)["epoch"]
    else:
        epoch = 0
        with open(epoch_name, "wb") as file:
            pickle.dump({"epoch": epoch}, file)

    # train for the amount of epochs
    for i in range(epochs):
        epoch += 1
        start = time.time()

        gen_loss_audio, disc_loss_audio = 0, 0

        for audio_batch in dataset:
            # train generator a couple times so it doesn't fall behind
            for x in range(SOLO_GEN_TIMES):
                train_generator_on_audio(audio_batch)

            # train both
            losses = train_step_on_audio(audio_batch)
            gen_loss_audio += losses[0]
            disc_loss_audio += losses[1]
            
        # record the progress if a logdir was provided
        if summary_writer is not None:
            with summary_writer.as_default():
                tf.summary.scalar(f"{category_name}/generator_{training_name}", gen_loss_audio, epoch)
                tf.summary.scalar(f"{category_name}/discriminator_{training_name}", disc_loss_audio, epoch)

        display.clear_output(wait=True)

        # Produce audio every `save_output_every` epochs
        if (i + 1) % save_output_every == 0:
            generate_and_save_audio(generator, epoch, display_output)

        # Save the model every `save_model_every` epochs
        if (i + 1) % save_model_every == 0:
            generator.save("./generatorv2Continued")
            discriminator.save("./discriminatorv2Continued")

        print(
            "Time for epoch {} (# {} of this session) is {} sec".format(
                epoch, i + 1, time.time() - start
            )
        )

    # Generate after the final epoch
    display.clear_output(wait=True)
    generate_and_save_audio(generator, epoch)

    # save our epoch number
    with open(epoch_name, "wb") as file:
        pickle.dump({"epoch": epoch}, file)


## Save and Load

In [29]:
def save(generator_path, discriminator_path):
    generator.save(f"{generator_path}/generator")
    discriminator.save(f"{discriminator_path}/discriminator")
    #np.save(f"{generator_path}/optimizer_weights", generator_optimizer.get_weights(), allow_pickle=True)
    #np.save(f"{discriminator_path}/optimizer_weights", discriminator_optimizer.get_weights(), allow_pickle=True)

def load(generator_path, discriminator_path):
    generator.load_weights(f"{generator_path}/generator")
    discriminator.load_weights(f"{discriminator_path}/discriminator")
    #generator_optimizer.set_weights(np.load(f"{generator_path}/optimizer_weights.npy", allow_pickle=True))
    #discriminator_optimizer.set_weights(np.load(f"{discriminator_path}/optimizer_weights.npy", allow_pickle=True))
    #print(len(generator_optimizer.get_weights()))
    #print(len(discriminator_optimizer.get_weights()))

In [30]:
load("./generatorv2/", "./discriminatorv2/")

## Train

Finally, after all of that setup, we train. This is the time-consuming part. A wild guess is that it will take about a week to train.

In [31]:
with tf.device(DEVICE):
    train(dataset, 500, save_model_every=1000, save_output_every=1000, tf_logdir="./logs")


Time for epoch 471 (# 241 of this session) is 5.273499488830566 sec


In [None]:
save("./generatorv2/", "./discriminatorv2/")





INFO:tensorflow:Assets written to: ./generatorv2//generator\assets


INFO:tensorflow:Assets written to: ./generatorv2//generator\assets






INFO:tensorflow:Assets written to: ./discriminatorv2//discriminator\assets


INFO:tensorflow:Assets written to: ./discriminatorv2//discriminator\assets
