In [None]:
import numpy as np
import tensorflow as tf
import random
import glob
import csv
import datetime

seed = 6969
tf.random.set_seed(seed)
np.random.seed(seed)
random.seed(seed)

import pathlib
replays_dir = pathlib.Path("replays")

import os
model_dir = "model"
if not os.path.exists(model_dir):
    os.makedirs(model_dir)

In [None]:
pre_segment_size = 12
post_segment_size = 12
prediction_size = 1
segment_size = pre_segment_size + post_segment_size + prediction_size

note_size = 47

input_shape = (segment_size, note_size)

batch_size = 256

In [None]:
def get_leaderboard_id_replays(leaderboard_id):
    replay_files = glob.glob(f"{replays_dir}/{leaderboard_id}/*.npy")
    return leaderboard_id, replay_files


def get_leaderboard_replays():
    leaderboard_ids = [d for d in replays_dir.iterdir() if d.is_dir()]
    random.shuffle(leaderboard_ids)
    val_leaderboard_ids = leaderboard_ids[: int(len(leaderboard_ids) * 0.2)]

    train_data = []
    val_data = []
    for leaderboard_id in leaderboard_ids:
        replay_files = glob.glob(f"{leaderboard_id}/*.npy")

        if leaderboard_id in val_leaderboard_ids:
            val_data.append((leaderboard_id, replay_files))
        else:
            train_data.append((leaderboard_id, replay_files))

    return train_data, val_data


def get_replay_notes(replay, njs):
    notes = []

    prev_red_note_time = 0
    prev_blue_note_time = 0

    for note_info, score, note_time in sorted(replay, key=lambda item: item[2]):
        color = note_info[-2]

        delta_to_red = note_time - prev_red_note_time
        delta_to_blue = note_time - prev_blue_note_time

        if color == "0":
            prev_red_note_time = note_time
            note = preprocess_note(score, delta_to_red, delta_to_blue, note_info, njs)
            notes.append(note)
        if color == "1":
            prev_blue_note_time = note_time
            note = preprocess_note(score, delta_to_blue, delta_to_red, note_info, njs)
            notes.append(note)

    return notes


def preprocess_note(score, delta_to_same_color, delta_to_opposite_color, note_info, njs):
    # delta_to_same_color_short = max(0, 0.5 - delta_to_same_color)*2
    # delta_to_same_color_long = max(0, 2 - delta_to_same_color)/2
    # delta_to_opposite_color_short = max(0, 0.5 - delta_to_opposite_color)*2
    # delta_to_opposite_color_long = max(0, 2 - delta_to_opposite_color)/2

    col_number = int(note_info[1])
    row_number = int(note_info[2])

    direction_number = int(note_info[4])
    color = int(note_info[3])

    row_col_red = [0] * 4 * 3
    direction_red = [0] * 10
    row_col_blue = [0] * 4 * 3
    direction_blue = [0] * 10
    if color == 0:
        row_col_red[col_number * 3 + row_number] = 1
        direction_red[direction_number] = 1
    if color == 1:
        row_col_blue[col_number * 3 + row_number] = 1
        direction_blue[direction_number] = 1

    response = []
    if color == 0:
        response.extend(row_col_red)
        response.extend(direction_red)
        response.extend(row_col_blue)
        response.extend(direction_blue)
        # response.append(delta_to_same_color_short)
        # response.append(delta_to_same_color_long)
        # response.append(delta_to_opposite_color_short)
        # response.append(delta_to_opposite_color_long)
        response.append(delta_to_same_color)
        response.append(delta_to_opposite_color)
    if color == 1:
        response.extend(row_col_blue)
        response.extend(direction_blue)
        response.extend(row_col_red)
        response.extend(direction_red)
        # response.append(delta_to_opposite_color_short)
        # response.append(delta_to_opposite_color_long)
        # response.append(delta_to_same_color_short)
        # response.append(delta_to_same_color_long)
        response.append(delta_to_opposite_color)
        response.append(delta_to_same_color)
    # response.append(njs / 30)
    response.append(njs)
    response.append(score)

    return response


def create_segments(notes):
    # NOTE: using relative score can be good to find relative difficulty of the notes more fairly
    # because good players will always get higher acc and worse players will do badly even on easy patterns

    if len(notes) < prediction_size:
        return ([], [])

    segments = []
    predictions = []

    # Iterate with a step size of prediction_size to achieve non-overlapping segments
    for i in range(0, len(notes) - prediction_size + 1, prediction_size):
        pre_slice = notes[max(0, i - pre_segment_size) : i]
        pre_segment = [np.array(note[:-1]) for note in pre_slice]
        if len(pre_segment) < pre_segment_size:
            # Insert at start of pre_segment
            pre_segment[0:0] = [
                np.zeros(note_size, dtype=np.float32)
                for _ in range(pre_segment_size - len(pre_segment))
            ]

        slice = notes[i : i + prediction_size]
        segment = [np.array(note[:-1]) for note in slice]

        post_slice = notes[
            i + prediction_size : i + prediction_size + post_segment_size
        ]
        post_segment = [np.array(note[:-1]) for note in post_slice]
        if len(post_segment) < post_segment_size:
            post_segment.extend(
                [
                    np.zeros(note_size, dtype=np.float32)
                    for _ in range(post_segment_size - len(post_segment))
                ]
            )

        # fix this pls
        prediction = [note[-1] for note in slice]

        final_segment = []
        final_segment.extend(pre_segment)
        final_segment.extend(segment)
        final_segment.extend(post_segment)
        segments.append(final_segment)

        predictions.append(prediction)

    return segments, predictions


def to_id(value):
    if int(value) < 30000:
        return str(30000 + int(value))
    return str(int(value))


def preprocess_leaderboard_replays(leaderboard_replays):
    note_data = {}
    replays = []

    # Load and sort each replay by note time
    for leaderboard_replay in leaderboard_replays:
        arr = np.load(leaderboard_replay)
        sort_indices = np.argsort(arr[:, 2])
        sorted_arr = arr[sort_indices]
        replays.append(sorted_arr)

    # Process each replay to aggregate note data
    for replay in replays:
        if len(note_data.values()) == 0:
            for values in replay:
                note_id_time_key = to_id(values[0]) + str(values[2] * 100)
                note_data[note_id_time_key] = [to_id(values[0]), [values[1]], values[2], 1]
        else:
            for values in replay:
                note_id_time_key = to_id(values[0]) + str(values[2] * 100)
                if note_id_time_key in note_data:
                    note_data[note_id_time_key][1].append(values[1])
                    note_data[note_id_time_key][2] += values[2]
                    note_data[note_id_time_key][3] += 1

    # Filter and average note data
    processed_notes = []
    for values in note_data.values():
        if values[3] < 8 or len(values[0]) > 5:
            return ([], [])

        acc_list = values[1]
        acc_list.sort()
        acc = sum(acc_list) / len(acc_list) if len(acc_list) > 0 else 0
        processed_notes.append([values[0], acc, values[2] / values[3]])

    # Generate segments from the processed notes
    notes = get_replay_notes(processed_notes, float(leaderboard_replays[0].split("-")[3].replace(".npy", "")))
    return create_segments(notes)


def generate_data(leaderboards_replays):
    all_segments = []
    all_scores = []

    for (leaderboard_id, leaderboard_replays) in leaderboards_replays:
        segments, scores = preprocess_leaderboard_replays(leaderboard_replays)
        if len(segments) == 0:
            continue

        all_segments.extend(segments)
        all_scores.extend(scores)

    return np.array(all_segments), np.array(all_scores)


def clamp(num, lower, higher):
    return min([max([num, lower]), higher])


train_data, val_data = get_leaderboard_replays()
test_data = val_data

train_x, train_y = generate_data(train_data)
val_x, val_y = generate_data(val_data)

In [None]:
from keras import Input, Model, layers

# Input layer
input_layer = Input(shape=input_shape)

bidirectional_1 = layers.Bidirectional(layers.LSTM(units=12, return_sequences=True, activation="tanh"), backward_layer=layers.LSTM(units=12, return_sequences=True, go_backwards=True, activation="tanh"))(input_layer)
bidirectional_2 = layers.Bidirectional(layers.LSTM(units=10, return_sequences=True, activation="tanh"), backward_layer=layers.LSTM(units=10, return_sequences=True, go_backwards=True, activation="tanh"))(bidirectional_1)

# Concatenate the output of the second bidirectional LSTM layer with the input
concatenate = layers.Concatenate()([bidirectional_2, input_layer])

# Cropping1D layer
cropping1d = layers.Cropping1D(cropping=(12, 12))(concatenate)  # Adjust the cropping as necessary

# TimeDistributed layer with Dense layer inside
time_distributed_1 = layers.TimeDistributed(layers.Dense(units=16, activation="relu"))(cropping1d)

# Another TimeDistributed layer with Dense layer inside
time_distributed_2 = layers.TimeDistributed(layers.Dense(units=16, activation="relu"))(time_distributed_1)

# Final TimeDistributed layer with Dense layer for output
time_distributed_3 = layers.TimeDistributed(layers.Dense(units=1, activation="linear"))(time_distributed_2)

# Create the model
model = Model(inputs=input_layer, outputs=time_distributed_3)

model.summary()

model.compile(
    optimizer=tf.keras.optimizers.SGD(learning_rate=0.01, momentum=0.9),
    loss=tf.keras.losses.MeanAbsoluteError(reduction="sum_over_batch_size"),
    metrics=['mse'],
)

In [None]:
log_dir = "logs/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

global initial_epoch
initial_epoch = 0
class EpochCallback(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        global initial_epoch
        initial_epoch += 1

In [None]:
train_epochs = 20

history = model.fit(
    train_x,
    train_y,
    validation_data=(val_x, val_y),
    callbacks=[tensorboard_callback, EpochCallback()],
    batch_size=128,
    epochs=initial_epoch + train_epochs,
    initial_epoch=initial_epoch,
    shuffle=True,
    verbose=1,
)

In [None]:
import tf2onnx

model.save(f"{model_dir}/model.keras")

spec = (tf.TensorSpec((None, segment_size, note_size), tf.float32, name="input_1"),)
model_proto, _ = tf2onnx.convert.from_keras(model, input_signature=spec, opset=None, output_path=f"{model_dir}/model.onnx")

In [None]:
import matplotlib.pyplot as plt

def acc_to_percent(acc):
	return (acc * 15 + 100) / 1.15

test_maps = [
	["fc4c11", "Naughty little demon - Easy"],
	["d90a11", "Watch Me Dance - Easy"],
	["16abf91", "The Nights - Ex+"],
	["2450491", "The Girl - Ex+"],
	# ["2c803x91", "ANOMALY - Ex+"],
	["31fbd91", "Sequence Breaker - Ex+"],
	["3963ex92", "heaven - One Saber"],
	# ["3838axxx92", "Lace It - One Saber"],
	# ["35062x92", "Spin Eternally - One Saber"],
]

for leaderboard_id, map_name in test_maps:
	test_x, test_y = generate_data([get_leaderboard_id_replays(leaderboard_id)])
	test_pred = model(test_x)

	real_acc = []
	predicted_acc = []

	for map_y, map_pred in zip(test_y, test_pred):
		for note_y, note_pred in zip(map_y, map_pred):
			real_acc.append(acc_to_percent(note_y))
			predicted_acc.append(acc_to_percent(note_pred[0]))

	overall_real_acc = sum(real_acc)/len(real_acc)
	overall_predicted_acc = sum(predicted_acc)/len(predicted_acc)
	print(f"{map_name} - Real acc: {overall_real_acc} - Predicted acc: {overall_predicted_acc}")

	plt.figure(figsize=(6, 4))
	plt.title(f"{map_name} ({leaderboard_id})")

	real_acc_smoothed = np.convolve(real_acc, np.ones(15)/15, mode="valid")
	x_real = [i for i in range(len(real_acc_smoothed))]
	plt.plot(x_real, real_acc_smoothed, label="Real Acc")

	predicted_acc_smoothed = np.convolve(predicted_acc, np.ones(15)/15, mode="valid")
	x_pred = [i for i in range(len(predicted_acc_smoothed))]
	plt.plot(x_pred, predicted_acc_smoothed, label="Predicted Acc")

	plt.xlabel("Note #")
	plt.ylabel("Acc")
	plt.tick_params(axis="x")
	plt.tick_params(axis="y")
	plt.xlim(0, max(len(x_real), len(x_pred)))
	plt.legend()

	plt.tight_layout(pad=1.0, w_pad=0.5, h_pad=0.5)
	plt.show()

In [None]:
predictions = []
for leaderboard_id, leaderboard_replays in test_data:
    try:
        curr, score = preprocess_leaderboard_replays(leaderboard_replays)

        _predictions = model.predict(np.array(curr))

        real_sum = 0
        for prediction in score:
            real_sum += prediction[0]

        real_avg = real_sum / len(score)
        real_percentage_score = (100 + real_avg * 15) / 115

        prediction_sum = 0
        for prediction in _predictions:
            prediction_sum += prediction[0]

        avg = prediction_sum / len(_predictions)
        percentage_score = (100 + avg[0] * 15) / 115

        predictions.append(
            [
                f"https://beatleader.xyz/leaderboard/global/{leaderboard_id}",
                round(percentage_score, 5),
                round(real_percentage_score, 5),
                abs(round(real_percentage_score - percentage_score, 5)),
            ]
        )
    except KeyboardInterrupt:
        raise
    except Exception as e:
        print(e)
        continue
with open(f"{model_dir}/predictions.csv", "w", newline="", encoding="utf-8") as f:
    writer = csv.writer(f)
    header = ["LeaderboardId", "Prediction", "Expected", "Difference"]
    writer.writerow(header)

    for prediction in predictions:
        writer.writerow(prediction)