In [1]:
import tensorflow as tf

2024-11-03 19:51:27.775468: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:

class RhythmAutoencoder(tf.keras.Model):
    def __init__(self, input_dim, latent_dim):
        super(RhythmAutoencoder, self).__init__()
        # Encoder
        self.encoder = tf.keras.Sequential([
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(latent_dim)
        ])
        # Decoder
        self.decoder = tf.keras.Sequential([
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(input_dim, activation='sigmoid')
        ])
    
    def call(self, x):
        z = self.encoder(x)
        reconstructed_x = self.decoder(z)
        return reconstructed_x, z

# Initialize autoencoder
input_dim = 16  # Example rhythm vector size
latent_dim = 2  # Latent dimension
autoencoder = RhythmAutoencoder(input_dim, latent_dim)
autoencoder.compile(optimizer='adam', loss='mse')


In [3]:
import numpy as np

# Dummy dataset of rhythmic sequences
rhythm_data = np.random.rand(100, input_dim)  # Replace with actual rhythm dataset
rhythm_data = (rhythm_data > 0.5).astype(float)  # Convert to binary values for simplicity

# Train autoencoder
autoencoder.fit(rhythm_data, rhythm_data, epochs=100, batch_size=10)

autoencoder.summary()

Epoch 1/100
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 4ms/step - loss: 0.2522
Epoch 2/100
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 0.2503
Epoch 3/100
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - loss: 0.2488
Epoch 4/100
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step - loss: 0.2472
Epoch 5/100
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 0.2460
Epoch 6/100
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 0.2440
Epoch 7/100
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - loss: 0.2405
Epoch 8/100
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 0.2373
Epoch 9/100
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: 0.2334
Epoch 10/100
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step - loss: 0.2263

In [4]:
def rhythm_density(rhythm):
    # Calculate density as the proportion of active beats within TensorFlow
    return tf.reduce_mean(tf.cast(rhythm > 0.5, tf.float32))

def syncopation(rhythm):
    # Calculate syncopation as the proportion of beats on off-beats (odd indices) that are active
    return tf.reduce_mean(tf.boolean_mask(rhythm, [i % 2 == 1 for i in range(len(rhythm))]))

# Target rhythm characteristics
target_density = 0.7
target_syncopation = 0.5
n_samples = 1000

def mcmc_sample(autoencoder, target_density, target_syncopation, n_samples=1000):
    samples = []
    current_latent = tf.random.normal([latent_dim])  # Start with random latent vector
    for i in range(n_samples):
        # Propose a new latent vector by a small random step
        proposal = current_latent + tf.random.normal([latent_dim]) * 0.1
        reconstructed_rhythm = autoencoder.decoder(proposal[None, :]).numpy().flatten()
        
        # Calculate the attributes of the proposed rhythm
        density = rhythm_density(reconstructed_rhythm)
        sync = syncopation(reconstructed_rhythm)
        
        # Acceptance probability based on how close we are to the target attributes
        acceptance_ratio = np.exp(-(abs(density - target_density) + abs(sync - target_syncopation)))
        if np.random.rand() < acceptance_ratio:
            current_latent = proposal  # Accept the proposed latent vector
            samples.append(reconstructed_rhythm)
    
    return samples


In [5]:
def em_update(autoencoder, samples, target_density, target_syncopation, epochs=10):
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
    
    for epoch in range(epochs):
        epoch_loss = 0  # Track loss for each epoch

        for sample in samples:
            sample = tf.convert_to_tensor(sample, dtype=tf.float32)[None, :]  # Ensure it's a TensorFlow tensor

            with tf.GradientTape() as tape:
                # Encode sample to latent representation `z`
                _, z = autoencoder(sample)
                
                # Decode the latent vector to reconstruct the rhythm
                reconstructed_rhythm = autoencoder.decoder(z)
                reconstructed_rhythm = tf.squeeze(reconstructed_rhythm)  # Flatten to 1D for analysis functions
                
                # Calculate attribute losses: density and syncopation difference from target
                density_loss = tf.abs(rhythm_density(reconstructed_rhythm) - target_density)
                syncopation_loss = tf.abs(syncopation(reconstructed_rhythm) - target_syncopation)
                
                # Total loss is a combination of density and syncopation losses
                total_loss = density_loss + syncopation_loss
            
            # Calculate gradients and apply them to the decoder weights only
            gradients = tape.gradient(total_loss, autoencoder.decoder.trainable_variables)
            optimizer.apply_gradients(zip(gradients, autoencoder.decoder.trainable_variables))

            # Accumulate loss over samples for the epoch
            epoch_loss += total_loss.numpy()
        
        # Print average loss per sample for the epoch
        avg_loss = epoch_loss / len(samples)
        print(f"EM Epoch [{epoch + 1}/{epochs}], Average Loss: {avg_loss:.4f}")

In [6]:
# Generate initial samples with MCMC sampling
generated_samples = mcmc_sample(autoencoder, target_density, target_syncopation, n_samples=100)

# Use EM refinement to adjust the decoder based on the target attributes
em_update(autoencoder, generated_samples, target_density, target_syncopation, epochs=10)


EM Epoch [1/10], Average Loss: 0.1864
EM Epoch [2/10], Average Loss: 0.1884
EM Epoch [3/10], Average Loss: 0.1901
EM Epoch [4/10], Average Loss: 0.1936
EM Epoch [5/10], Average Loss: 0.1930
EM Epoch [6/10], Average Loss: 0.1941
EM Epoch [7/10], Average Loss: 0.1982
EM Epoch [8/10], Average Loss: 0.1961
EM Epoch [9/10], Average Loss: 0.2002
EM Epoch [10/10], Average Loss: 0.2002


In [7]:


def generate_new_rhythm(autoencoder, num_rhythms=5, latent_dim=4):
    """
    Generate new rhythm patterns by sampling from the latent space.
    
    Parameters:
    - autoencoder: The trained MCMC-EM autoencoder model.
    - num_rhythms: Number of rhythms to generate.
    - latent_dim: Dimensionality of the latent space.
    
    Returns:
    - List of generated rhythms as numpy arrays.
    """
    generated_rhythms = []
    
    for _ in range(num_rhythms):
        # Sample from a normal distribution in the latent space
        z_sample = tf.random.normal([1, latent_dim])
        
        # Decode the latent vector to generate a rhythm pattern
        generated_rhythm = autoencoder.decoder(z_sample)
        
        # Binarize the rhythm output based on a threshold to create binary rhythm patterns
        generated_rhythm = tf.cast(generated_rhythm > 0.5, tf.float32)
        
        generated_rhythms.append(generated_rhythm.numpy().squeeze())
    
    return generated_rhythms

# Example usage
new_rhythms = generate_new_rhythm(autoencoder, num_rhythms=5)
for i, rhythm in enumerate(new_rhythms, start=1):
    print(f"Generated Rhythm {i}: {rhythm}")


ValueError: Exception encountered when calling Sequential.call().

[1mInput 0 of layer "dense_2" is incompatible with the layer: expected axis -1 of input shape to have value 2, but received input with shape (1, 4)[0m

Arguments received by Sequential.call():
  • inputs=tf.Tensor(shape=(1, 4), dtype=float32)
  • training=None
  • mask=None