In [2]:

import tensorflow as tf
from tensorflow.keras.callbacks import EarlyStopping, LearningRateScheduler, ReduceLROnPlateau
from tensorflow.keras.layers import BatchNormalization, GlobalAveragePooling2D, Input, Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
import keras_tuner as kt
import gzip
import json
import math
import numpy as np
import matplotlib.pyplot as plt

def load_puzzle_data(file_path):
    !cd buggle-training-data && git pull
    with gzip.open(file_path, 'rt') as f:
      puzzle_data = json.load(f)
      return puzzle_data

alphabet = ["a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r","s","t","u","v","w","x","y","z"]
char_to_int = dict((c, i) for i, c in enumerate(alphabet))

def one_hot_encode(matrix):
    one_hot_encoded = np.zeros((len(matrix), len(matrix[0]), len(alphabet)))
    for i, row in enumerate(matrix):
        for j, char in enumerate(row):
            one_hot_encoded[i, j, char_to_int[char]] = 1
    return one_hot_encoded

def step_decay(epoch):
    initial_lr = 0.001  
    drop = 0.5          
    epochs_drop = 20.0  
    lr = initial_lr * math.pow(drop, math.floor((1 + epoch) / epochs_drop))
    return lr

lr_scheduler = LearningRateScheduler(step_decay)

def extract_features(puzzles):
    num_puzzles = len(puzzles)
    matrix_features = np.zeros((num_puzzles, *(4,4), 26))
    outcomes = np.zeros(num_puzzles)

    for idx, puzzle in enumerate(puzzles):
        matrix = puzzle['matrix']
        total_words = puzzle['totalWords']
        matrix_features[idx] = one_hot_encode(matrix)
        outcomes[idx] = total_words

    return np.array(matrix_features), np.array(outcomes)

def preprocess_data(matrix_features, outcomes):
    X_train_val, X_test, y_train_val, y_test = train_test_split(matrix_features, outcomes, test_size=0.2, random_state=42)
    X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.25, random_state=42)
    return X_train, X_val, X_test, y_train, y_val, y_test

def augment_data(matrix_features, outcomes):
    augmented_matrices = []
    augmented_outcomes = []

    for idx, matrix in enumerate(matrix_features):
        # Perform rotations
        matrix_90 = np.rot90(matrix)
        # matrix_180 = np.rot90(matrix, k=2)
        matrix_270 = np.rot90(matrix, k=3)

        # Perform flips
        # matrix_flipX = np.fliplr(matrix)
        # matrix_flipY = np.flipud(matrix)

        # Perform diagonal flips with explicit axes
        # matrix_diagonal_tl_br = np.transpose(matrix, axes=(1, 0, 2))  # Top-left to bottom-right diagonal flip
        # matrix_diagonal_tr_bl = np.transpose(np.fliplr(matrix), axes=(1, 0, 2))  # Top-right to bottom-left diagonal flip

        # Collect all unique transformations
        transformations = [
            matrix,
            matrix_90,
            # matrix_180,
            matrix_270,
            # matrix_flipX,
            # matrix_flipY,
            # matrix_diagonal_tl_br,
            # matrix_diagonal_tr_bl,
        ]

        # Remove duplicate transformations
        unique_transformations = []
        for transform in transformations:
            if not any(np.array_equal(transform, unique) for unique in unique_transformations):
                unique_transformations.append(transform)

        # Add the unique transformations to the augmented list
        augmented_matrices.extend([np.array(transform) for transform in unique_transformations])
        augmented_outcomes.extend([outcomes[idx]] * len(unique_transformations))

    return np.array(augmented_matrices), np.array(augmented_outcomes)

def build_model2(input_shape):
    model = Sequential([
        Input(shape=input_shape),
        Conv2D(128, (3, 3), activation='relu', padding='same'),
        BatchNormalization(),
        Conv2D(128, (2, 2), activation='relu', padding='same'),
        BatchNormalization(),
        GlobalAveragePooling2D(),
        Dense(128, activation='relu'),
        Dropout(0.3),
        Dense(64, activation='relu'),
        Dense(1)
    ])
    model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error', metrics=['mean_absolute_error'])
    model.summary()
    return model

def build_model(hp):
    model = Sequential([
        Input(shape=(4, 4, 26)),
        Conv2D(hp.Int('conv_units', min_value=32, max_value=128, step=32), (3, 3), activation='relu', padding='same'),
        BatchNormalization(),
        Conv2D(hp.Int('conv_units', min_value=32, max_value=128, step=32), (2, 2), activation='relu', padding='same'),
        BatchNormalization(),
        GlobalAveragePooling2D(),
        Dense(hp.Int('dense_units', min_value=64, max_value=256, step=64), activation='relu'),
        Dropout(hp.Float('dropout_rate', min_value=0.3, max_value=0.5, step=0.1)),
        Dense(1)
    ])
    model.compile(optimizer=Adam(learning_rate=hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4])),
                  loss='mean_squared_error',
                  metrics=['mean_absolute_error'])
    return model

def main():
    puzzle_data = load_puzzle_data('buggle-training-data/training_data-100000.gz')
    print(f"Got {len(puzzle_data)} puzzles.")
    matrix_features, outcomes = extract_features(puzzle_data)
    print(f"Loaded {len(matrix_features)} matrix features.")
    print(f"Loaded {len(outcomes)} outcomes.")

    augmented_matrix_features, augmented_outcomes = augment_data(matrix_features, outcomes)
    X_train, X_val, X_test, y_train, y_val, y_test = preprocess_data(augmented_matrix_features, augmented_outcomes)
    print(f"Preprocessed {len(X_train)} training examples, {len(X_val)} validation examples, and {len(X_test)} test examples.")
    print(f"X_train shape: {X_train.shape}, y_train shape: {y_train.shape}")


    # tuner = kt.Hyperband(build_model,
    #                     objective='val_mean_absolute_error',
    #                     max_epochs=50,
    #                     factor=3,
    #                     directory='my_dir',
    #                     project_name='hyperparam_tuning')

    # stop_early = EarlyStopping(monitor='val_loss', patience=5)

    # tuner.search(X_train, y_train, epochs=50, validation_data=(X_val, y_val), callbacks=[stop_early])

    # best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
    # print(f"Best Hyperparameters: {best_hps}")

    # model = tuner.hypermodel.build(best_hps)
    # history = model.fit(X_train, y_train, epochs=50, validation_data=(X_val, y_val), callbacks=[stop_early])

    # test_results = model.evaluate(X_test, y_test, verbose=1)
    # print(f"Test Loss: {test_results[0]}, Test MAE: {test_results[1]}")


    model = build_model(X_train.shape[1:])
    print("Model built.")

    early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
    reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=2, min_lr=0.0001, verbose=1)

    history = model.fit(
      X_train, y_train,
      epochs=50,
      batch_size=32,
      validation_data=(X_val, y_val),
      verbose=1,
      callbacks=[early_stopping, reduce_lr, lr_scheduler]
    )

    test_results = model.evaluate(X_test, y_test, verbose=1)
    print(f"Test Loss: {test_results[0]}, Test MAE: {test_results[1]}")

    plt.figure(figsize=(8, 4))
    plt.plot(history.history['mean_absolute_error'], label='MAE (training data)')
    plt.plot(history.history['val_mean_absolute_error'], label='MAE (validation data)')
    plt.title('MAE for Puzzle Prediction')
    plt.ylabel('MAE value')
    plt.xlabel('No. epoch')
    plt.legend(loc="upper right")
    plt.show()

    model.save('buggle-training-data/models/prediction_model-8.keras')
    print('Model saved.')




if __name__ == '__main__':
    main()

ModuleNotFoundError: No module named 'tensorflow'