In [1]:
import numpy as np
import os
os.environ["KERAS_BACKEND"] = "jax"
import keras
from quantum_model import Quantum_Strategy
from tqdm import tqdm
from tqdm.keras import TqdmCallback

In [6]:
def load_data(n):
    data = np.load(f"data_{n}.npz")
    data_size = len(data['X'])
    x_train = data['X']; y_train = data['Y']
    x_test = np.random.randint(0, 2, (1000, x_train.shape[1]))
    # one-hot
    x_train = keras.utils.to_categorical(x_train, 2)
    x_test = keras.utils.to_categorical(x_test, 2)
    # 0, 1, start=2, end=3
    y_train = np.hstack([np.ones((data_size, 1)) * 2, y_train, np.ones((data_size, 1)) * 3])
    y_train = keras.utils.to_categorical(y_train, 4)
    decoder_input_data = y_train[:, :-1]
    decoder_target_data = y_train[:, 1:]
    print("n=", n)
    print("n_qubits=", 4 * n)
    print("x_train shape:", x_train.shape)
    print("y_train shape:", y_train.shape)
    print(x_train.shape[0], "train samples")
    print(x_test.shape[0], "test samples")
    max_decoder_seq_length = decoder_input_data.shape[1]
    return x_train, x_test, decoder_input_data, decoder_target_data, max_decoder_seq_length

In [15]:
def build_model_and_train(n, latent_dim, x_train, decoder_input_data, decoder_target_data, verbose=0):
    # Define an input sequence and process it.
    encoder_inputs = keras.Input(shape=(None, 2))
    encoder = keras.layers.GRU(latent_dim, return_state=True)
    encoder_outputs, state_h = encoder(encoder_inputs)

    # We discard `encoder_outputs` and only keep the states.
    encoder_states = state_h

    # Set up the decoder, using `encoder_states` as initial state.
    decoder_inputs = keras.Input(shape=(None, 4))

    # We set up our decoder to return full output sequences,
    # and to return internal states as well. We don't use the
    # return states in the training model, but we will use them in inference.
    decoder_lstm = keras.layers.GRU(latent_dim, return_sequences=True, return_state=True)
    decoder_outputs, _, = decoder_lstm(decoder_inputs, initial_state=encoder_states)
    decoder_dense = keras.layers.Dense(4, activation="softmax")
    decoder_outputs = decoder_dense(decoder_outputs)

    # Define the model that will turn
    # `encoder_input_data` & `decoder_input_data` into `decoder_target_data`
    model = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)
    model.compile(
        loss="categorical_crossentropy",
        optimizer=keras.optimizers.Adam(learning_rate=3e-4),
    )
    batch_size = 1000

    callbacks = [
        keras.callbacks.ModelCheckpoint(filepath=f"model_n{n}_l{latent_dim}.keras", save_best_only=True),
        keras.callbacks.EarlyStopping(monitor="val_loss", patience=500),
        TqdmCallback(verbose=0)
    ]

    model.fit(
        [x_train, decoder_input_data],
        decoder_target_data,
        batch_size=batch_size,
        epochs=10000,
        validation_split=0.1,
        callbacks=callbacks,
        verbose=verbose,
    )
    return model

In [16]:
def sample_and_predict(n, latent_dim, test_size, max_decoder_seq_length, x_train, x_test):
    # Define sampling models
    # Restore the model and construct the encoder and decoder.
    model = keras.models.load_model(f"model_n{n}_l{latent_dim}.keras")

    encoder_inputs = model.input[0]  # input_1
    encoder_outputs, state_h_enc = model.layers[2].output  # lstm_1
    encoder_states = state_h_enc
    encoder_model = keras.Model(encoder_inputs, encoder_states)

    decoder_inputs = model.input[1]  # input_2
    decoder_state_input_h = keras.Input(shape=(latent_dim,))
    decoder_states_inputs = decoder_state_input_h
    decoder_lstm = model.layers[3]
    decoder_outputs, state_h_dec = decoder_lstm(
        decoder_inputs, initial_state=decoder_states_inputs
    )
    decoder_states = state_h_dec
    decoder_dense = model.layers[4]
    decoder_outputs = decoder_dense(decoder_outputs)
    decoder_model = keras.Model(
        [decoder_inputs] + [decoder_states_inputs], [decoder_outputs] + [decoder_states]
    )

    def decode_sequence(input_seq):
        # Encode the input as state vectors.
        states_value = encoder_model.predict(input_seq, verbose=0)

        # Generate empty target sequence of length 1.
        target_seq = np.zeros((1, 1, 4))
        # Populate the first character of target sequence with the start character 2.
        target_seq[0, 0, 2] = 1.0

        # Sampling loop for a batch of sequences
        # (to simplify, here we assume a batch of size 1).
        stop_condition = False
        decoded_sentence = []
        while not stop_condition:
            output_tokens, h = decoder_model.predict(
                [target_seq] + [states_value], verbose=0
            )

            # Sample a token
            sampled_char = np.argmax(output_tokens[0, -1, :])
            decoded_sentence += [sampled_char]


            # Exit condition: either hit max length
            # or find stop character.
            if sampled_char == 3 or len(decoded_sentence) > max_decoder_seq_length:
                stop_condition = True

            # Update the target sequence (of length 1).
            target_seq = np.zeros((1, 1, 4))
            target_seq[0, 0, sampled_char] = 1.0

            # Update states
            states_value = h
        return decoded_sentence
    
    pred = np.zeros((test_size, x_train.shape[1]))
    for seq_index in tqdm(range(test_size)):
        # Take one sequence (part of the training set)
        # for trying out decoding.
        input_seq = x_test[seq_index : seq_index + 1]
        decoded_sentence = decode_sequence(input_seq)
        pred[seq_index] = decoded_sentence[:-1]
        
    qs = Quantum_Strategy(n)
    results = qs.check_input_output(np.argmax(x_test[:test_size], axis=-1), pred)
    return np.mean(results), np.std(results)

In [25]:
def write_result(n, latent_dim, result):
    # add result to a csv file
    # file structure: n, latent_dim, result
    # where result is a list of past results
    
    if os.path.exists("results.csv"):
        with open("results.csv", "r") as f:
            lines = f.readlines()
            if len(lines) == 0:
                lines = []
            else:
                lines = lines[0].strip().split("\n")
            if len(lines) == 0 or len(lines[-1].split(",")) == 3:
                lines.append(f"{n},{latent_dim},{result}")
            else:
                lines[-1] += f",{result}"
    else:
        # write header if file does not exist
        lines = ["n,latent_dim,result"]
        lines += [f"{n},{latent_dim},{result}"]
    with open("results.csv", "w") as f:
        f.write("\n".join(lines))

In [19]:
n = 2
latent_dim = 64
test_size = 5000
x_train, x_test, decoder_input_data, decoder_target_data, max_decoder_seq_length = load_data(n)
model = build_model_and_train(n, latent_dim, x_train, decoder_input_data, decoder_target_data)
result = sample_and_predict(n, latent_dim, test_size, max_decoder_seq_length, x_train, x_test)
print(result)
write_result(n, latent_dim, result)

x_train shape: (10000, 8, 2)
y_train shape: (10000, 10, 4)
10000 train samples
1000 test samples


0epoch [00:00, ?epoch/s]

100%|██████████| 1000/1000 [00:04<00:00, 242.18it/s]

1.0





In [26]:
write_result(n, latent_dim, result)