In [1]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras import callbacks
import ipywidgets as widgets
from IPython.display import display
from tensorflow.keras.optimizers import Adam
import time
from keras import regularizers

In [59]:
DATA_INPUT_FILE = 'lichessParser/nnInput.npy'
BATCH_SIZE = 4096  # Define the batch size

In [60]:
def numStr(num):
    return '{:,}'.format(
        np.round(num, 6)
    ).replace(',', ' ')

def load_data_in_batches(file_path, batch_size):
    """
    Generator function to load large numpy data in batches.
    
    Args:
        file_path (str): Path to the .npy file containing the neural network input data.
        batch_size (int): Size of the batches to load.
    
    Yields:
        Numpy arrays of size (batch_size, feature_length) until the full dataset is processed.
    """
    # Load the memory-mapped file
    mmapped_array = np.load(file_path, mmap_mode='r')  # Read in memory-mapped mode
    
    total_samples = mmapped_array.shape[0]  # Total number of entries (rows)
    feature_length = mmapped_array.shape[1]  # Number of features (columns)
    print('tot batches:', numStr(total_samples/batch_size), 'tot samples: ', numStr(total_samples))
    # Iterate over the file in batches
    for start_idx in range(0, total_samples, batch_size):
        end_idx = min(start_idx + batch_size, total_samples)
        
        # Yield the next batch
        yield mmapped_array[start_idx:end_idx]

In [77]:
# Step 3: Define and Train the Encoder-Decoder model
my_regularizer = regularizers.l1(10e-8)

input_layer = Input(shape=(65,))
hidden_encoder = Dense(100, activation='tanh')(input_layer)
# hidden_encoder2 = Dense(100, activation='relu')(hidden_encoder)
latent_layer = Dense(30,
                     activation='tanh', 
                     activity_regularizer=my_regularizer
                    )(hidden_encoder)

In [78]:
# Decoder: Latent space -> Hidden layer -> Output (2 bits)
hidden_decoder = Dense(100, activation='relu')(latent_layer)
# hidden_decoder2 = Dense(2000, activation='tanh')(hidden_decoder)
output_layer = Dense(65, activation='tanh')(hidden_decoder)

In [79]:
# Full model: Encoder + Decoder
autoencoder = Model(inputs=input_layer, outputs=output_layer)
optimizer = Adam(
    learning_rate=0.005
)
autoencoder.compile(
    optimizer=optimizer,
   # loss='mse'
    loss=keras.losses.MeanAbsoluteError(),
)

In [80]:
ittr = 0
totLoss = 0
print_every = 250

for i in range(0, 20):
    batch_num = 0
    print('===== epoch', i+1, '======')
    for batch in load_data_in_batches(DATA_INPUT_FILE, BATCH_SIZE):
        # Example of feeding batch to a neural network (TensorFlow or PyTorch)
        # model.train_on_batch(batch)  # TensorFlow/Keras training example
        # OR
        # output = model(batch)        # PyTorch training example

        # You can also perform any data manipulation or processing here
        # autoencoder.fit(input_data, output_data, epochs=100, verbose=0)
        output_data = batch.copy()
        loss = autoencoder.train_on_batch(x=batch, y=output_data)
        totLoss = totLoss + loss

        if ittr % print_every == 0 and not ittr == 0:
            print(
                numStr(ittr)+' | '+numStr(batch_num),
                '] avg loss:',
                numStr(totLoss/print_every)
            )
            totLoss = 0

        ittr = ittr+1
        batch_num = batch_num+1

print('=========== D O N E ================')

tot batches: 2 447.951416 tot samples:  10 026 809
250 | 250 ] avg loss: 0.103748
500 | 500 ] avg loss: 0.070374
750 | 750 ] avg loss: 0.065749
1 000 | 1 000 ] avg loss: 0.062957
1 250 | 1 250 ] avg loss: 0.061457
1 500 | 1 500 ] avg loss: 0.059346
1 750 | 1 750 ] avg loss: 0.058086
2 000 | 2 000 ] avg loss: 0.057169
2 250 | 2 250 ] avg loss: 0.056327
tot batches: 2 447.951416 tot samples:  10 026 809
2 500 | 52 ] avg loss: 0.055935
2 750 | 302 ] avg loss: 0.055567
3 000 | 552 ] avg loss: 0.05542
3 250 | 802 ] avg loss: 0.055085
3 500 | 1 052 ] avg loss: 0.054572
3 750 | 1 302 ] avg loss: 0.054647
4 000 | 1 552 ] avg loss: 0.054123
4 250 | 1 802 ] avg loss: 0.053762
4 500 | 2 052 ] avg loss: 0.053466
4 750 | 2 302 ] avg loss: 0.053461
tot batches: 2 447.951416 tot samples:  10 026 809
5 000 | 104 ] avg loss: 0.053367
5 250 | 354 ] avg loss: 0.053041
5 500 | 604 ] avg loss: 0.052992
5 750 | 854 ] avg loss: 0.052976
6 000 | 1 104 ] avg loss: 0.05272
6 250 | 1 354 ] avg loss: 0.052697
6 5

In [53]:
# struct: 65 -> 1000 -> 100 -> 30 -> 100 -> 1000 -> 65 (with regulizer), relu
batch=4096, LR=0.005,  loss: 0.058 , epochs: 33  (reached 0.058 at epoch 20)

# struct: 65 -> 300 -> 100 -> 30 -> 100 -> 300 -> 65 (with regulizer), relu
batch=4096, LR=0.005,  loss: 0.045, epochs: 20  (reached 0.045 at epoch 11)

# struct: 65 -> 2000 -> 30 -> 2000 -> 65 (with regulizer), relu
batch=1024, LR=0.0003, loss: 0.025
batch=256,  LR=0.0003, loss: 0.025, epochs: 6 (reached 0.025 at epoch 5)
batch=4096, LR=0.0003, loss: 0.031, epochs: 30 (still decreasing)
batch=4096, LR=0.005,  loss: 0.025, epochs: 20  (reached 0.025 at epoch 9)



SyntaxError: invalid syntax. Maybe you meant '==' or ':=' instead of '='? (328716366.py, line 2)

In [None]:
# predictions = autoencoder.predict(input_data)
# print(predictions)
# for i, input_example in enumerate(input_data):
#     print(f"Input: {input_example}, Predicted output: {np.round(predictions[i])}")
    
for batch in load_data_in_batches(DATA_INPUT_FILE, 15):
    for i, nnInput in enumerate(batch):
        print(np.array([nnInput]))
        output = autoencoder.predict(np.array([nnInput]))
        # print(output)
        print('===============')
        print('====== set',i)
        for j, predicted in enumerate(output[0]):
            print(
                'ans',nnInput[j],
                'got', np.round(predicted, 3),
                'err', np.round(abs(nnInput[j]-predicted), 4)
            )

    break;

In [24]:
# SAVE THE MODEL
autoencoder.save('chess_autoencoder.h5')

In [None]:
autoencoder.predict(np.array([[-1., -0.667, 0., -0.5, -0.833, -1., -0.5, -0.333, -0.667, -0.167
, -0.167, 0., 0., -0.167, -0.167, -0.167, -0.167, 0., 0., -0.333
, 0., 0., 0., 0., 0., 0., 0., 0., -0.167, 0.
, 0., 0., 0., 0., 0., 0., 0.167, 0., 0.167, 0.
, 0., 0., 0., 0.167, 0., 0., 0., 0., 0., 0.167
, 0.167, 0., 0., 0., 0., 0.167, 0.167, 0.667, 0.333, 0.5
, 0.833, 1., 0.5, 0.333, 0.667]]))

encoder = Model(inputs=input_layer, outputs=latent_layer)


In [None]:
latent_input = Input(shape=(1,))
decoder_hidden = autoencoder.layers[-3](latent_input)  # The hidden decoder layer
decoder_hidden2 = autoencoder.layers[-2](decoder_hidden)  # The hidden decoder layer
decoder_output = autoencoder.layers[-1](decoder_hidden2)  # The output decoder layer
decoder = Model(inputs=latent_input, outputs=decoder_output)


In [None]:

print("Input -> Latent Space -> Reconstructed Output")
for i, input_example in enumerate(input_data):
    # Get latent space value
    latent_values = encoder.predict(np.array([input_example]))
    
    # Get reconstructed output
    reconstructed_output = decoder.predict(latent_values)
    
    # Print the result
    print(f"Input: {input_example}, Latent Space: {latent_values[0]}, Reconstructed Output: {np.round(reconstructed_output[0])}")

In [None]:
latent_slider1 = widgets.FloatSlider(value=0.0, min=-1, max=1, step=0.004, description='Latent 1', layout=widgets.Layout(width='800px'))
#latent_slider2 = widgets.FloatSlider(value=0.0, min=-2.0, max=2.0, step=0.01, description='Latent 2')


In [None]:
output_display = widgets.Output()


In [None]:
def update_output(change=None):
    #latent_values = np.array([[latent_slider1.value, latent_slider2.value]])
    latent_values = np.array([[latent_slider1.value]])
    predicted_output = decoder.predict(latent_values)
    with output_display:
        output_display.clear_output(wait=True)
        print(f"Latent space: {latent_values}")
        print(f"Decoded output: {np.round(predicted_output, 2)}")


In [None]:
latent_slider1.observe(update_output, names='value')
#latent_slider2.observe(update_output, names='value')

In [None]:
display(latent_slider1, output_display)
#display(latent_slider1, latent_slider2, output_display)


In [None]:
update_output()
