In [1]:
import os
import csv
import math
import json
import numpy as np
import pandas
import joblib

import sklearn
from sklearn.metrics import mean_squared_error, max_error
from sklearn.svm import SVR
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline, Pipeline
from sklearn.model_selection import train_test_split

from sklearn_pandas import DataFrameMapper

from sklearn2pmml.pipeline import PMMLPipeline
from sklearn2pmml.postprocessing import BusinessDecisionTransformer
from sklearn2pmml import sklearn2pmml

In [2]:
MAP_FOLDER = "data/Ranked"

def getMapData(folder):
    mapDataName = ""
    if "info.dat" in os.listdir(f"{MAP_FOLDER}/{folder}"):
        mapDataName = "info.dat"
    if "Info.dat" in os.listdir(f"{MAP_FOLDER}/{folder}"):
        mapDataName = "Info.dat"
    with open(f"{MAP_FOLDER}/{folder}/{mapDataName}", "r") as f:
        mapData = f.read()
        mapJson = json.loads(mapData)
        return mapJson

def getDifficultyMaps(folder):
    difficultyMaps = []
    mapJson = getMapData(folder)
    for d in mapJson["_difficultyBeatmapSets"][0]["_difficultyBeatmaps"]:
        difficultyMaps.append((d["_beatmapFilename"], d["_difficultyRank"]))
    return difficultyMaps
                
def getDifficultyMapData(folder, file):
    path = f"{MAP_FOLDER}/{folder}/{file}"
    with open(f"{path}", "r") as f:
        mapData = f.read()
        mapJson = json.loads(mapData)
        return mapJson

def getMaps():
    maps = []
    for obj in os.listdir(f"{MAP_FOLDER}"):
        objPath = f"{MAP_FOLDER}/{obj}"
        if os.path.isdir(objPath) and ("info.dat" in os.listdir(objPath) or "Info.dat" in os.listdir(objPath)):
            maps.append(obj)
    return maps

def getNoteDensity(diffMapData, duration):
    notesList = diffMapData["_notes"]
    return len(notesList)/duration

def beatToSec(beat, bpm):
    return 60/bpm * beat

# heuristically, windowLength = 2.75, step = 0.25 is best
def getLocalNoteDensities(diffMapData, duration, bpm, windowLength=2.75, step=0.25):
    densities = []
    beatsPerWindow = bpm/60 * windowLength
    windowLower = 0
    windowUpper = windowLength
    while windowUpper < duration:
        numNotes = 0
        for n in diffMapData["_notes"]:
            noteTime = beatToSec(n["_time"], bpm)
            if windowLower <= noteTime and noteTime <= windowUpper:
                numNotes += 1
        densities.append(numNotes/windowLength)
        windowLower += step
        windowUpper += step
    return densities

def getLocalColumnVariety(diffMapData, duration, bpm, windowLength=2.75, step=0.25):
    variety = []
    beatsPerWindow = bpm/60 * windowLength
    windowLower = 0
    windowUpper = windowLength
    while windowUpper < duration:
        localVariety = np.array([0, 0, 0, 0])
        for n in diffMapData["_notes"]:
            noteTime = beatToSec(n["_time"], bpm)
            noteCol = n["_lineIndex"]
            if windowUpper <= noteTime:
                break
            if windowLower <= noteTime:
                localVariety[noteCol] += 1
            if np.linalg.norm(localVariety, 1) > 0:
                # L1-normalise or normalise for the amount of notes
                normLocalVariety = localVariety / np.linalg.norm(localVariety, 1)
                # maps with higher column variety will have a distribution closer to [.25, .25, .25, .25]
                score = np.linalg.norm(normLocalVariety - np.array([0.25, 0.25, 0.25, 0.25]), 2)
                # higher is better
                variety.append(-1 * score)
        
        windowLower += step
        windowUpper += step
    return variety

In [5]:
# pre-process maps to extract features

featureList = ["BPM", "PeakNoteDensity", "PeakColumnVariety"]
targetFeature = ["Difficulty"]

with open("data/features.csv", "w") as csvFile:
    csvW = csv.writer(csvFile)
    csvHeader = featureList + targetFeature
    csvW.writerow(csvHeader)

    for beatmap in getMaps():
        mapData = getMapData(beatmap)
        songName = mapData["_songName"]
        songDuration = mapData["_songApproximativeDuration"] 
        songBpm = mapData["_beatsPerMinute"]
        
        diffMaps = getDifficultyMaps(beatmap)
        for diffMapObj in diffMaps:
            diffMap = diffMapObj[0]
            diffRank = diffMapObj[1]
            
            diffMapData = getDifficultyMapData(beatmap, diffMap)
            diffMapNoteDensity = getNoteDensity(diffMapData, songDuration)
            maxND = np.max(getLocalNoteDensities(diffMapData, songDuration, songBpm))
            diffMapColVariety = getLocalColumnVariety(diffMapData, songDuration, songBpm)
            diffMapColVarietyTop = np.quantile(diffMapColVariety, 0.75)
            features = [songBpm, maxND, diffMapColVarietyTop]
            row = features + [diffRank]
            csvW.writerow(row)

In [25]:
# split into train and test samples

TEST_PERCENT = 0.2

data = pandas.read_csv("data/features.csv")

X_in = data[featureList]
Y_in = data["Difficulty"]
X_train, X_test, Y_train, Y_test = train_test_split(X_in, Y_in, test_size=TEST_PERCENT, random_state=42)

#Y_train = np.transpose(Y_train).tolist()[0]
#Y_test = np.transpose(Y_test).tolist()[0]

def trainSvrModel(C):
    column_preprocessor = DataFrameMapper([
        (featureList, [StandardScaler()])
    ])

    # since actual difficulty ranks are integer, we take epsilon = 0.5
    table_preprocessor = Pipeline([
        ("svr", SVR(C=C, epsilon=0.5))
    ])

    pmmlPipeline = PMMLPipeline([
        ("columns", column_preprocessor),
        ("table", table_preprocessor),
    ])
    pmmlPipeline.fit(X_train, Y_train)
    pmmlPipeline.verify(X_train.sample(n = 1))
    return pmmlPipeline

In [26]:
# find best C parameter

C = np.linspace(0.01, 50, num=1000)
bestC = C[0]
bestErrRms = float("inf")

for i in range(len(C)):
    pmmlPipeline = trainSvrModel(C[i])

    Y_pred = pmmlPipeline.predict(X_test)
    errRms = mean_squared_error(Y_test, Y_pred, squared=False)
    if errRms < bestErrRms:
        bestC = C[i]
        bestErrRms = errRms

print(f"C = {round(bestC, 2)}, Error: {round(bestErrRms, 3)} RMS")

C = 1.11, Error: 0.636 RMS


In [27]:
# display error of model

pmmlPipeline = trainSvrModel(bestC)

Y_pred = pmmlPipeline.predict(X_test)
errRms = mean_squared_error(Y_test, Y_pred, squared=False)
errMax = max_error(Y_test, Y_pred)
print(f"Total RMS Error: {round(errRms, 3)}, Max Error: {round(errMax, 3)}\n")
 
print("Guess      | Actual | Error")
print("-----------+--------+------")
for i in range(len(Y_pred)): 
    guess = round(Y_pred[i], 3)
    guessInt = int(round(Y_pred[i], 0))
    actual = Y_test.iloc[i]
    err = round(abs(Y_test.iloc[i] - Y_pred[i]), 2)
    print(f"({str(guess).ljust(5)}) {str(guessInt).rjust(2)} |   {str(actual).rjust(2)}   | {err}")

Total RMS Error: 0.636, Max Error: 1.215

Guess      | Actual | Error
-----------+--------+------
(9.15 )  9 |    9   | 0.15
(7.276)  7 |    7   | 0.28
(7.84 )  8 |    8   | 0.16
(8.458)  8 |    9   | 0.54
(8.371)  8 |    8   | 0.37
(5.264)  5 |    5   | 0.26
(5.928)  6 |    6   | 0.07
(7.961)  8 |    7   | 0.96
(7.262)  7 |    7   | 0.26
(8.798)  9 |   10   | 1.2
(7.117)  7 |    8   | 0.88
(7.97 )  8 |    9   | 1.03
(8.002)  8 |    8   | 0.0
(8.008)  8 |    8   | 0.01
(7.243)  7 |    8   | 0.76
(8.106)  8 |    8   | 0.11
(6.848)  7 |    7   | 0.15
(7.462)  7 |    7   | 0.46
(7.541)  8 |    8   | 0.46
(7.971)  8 |    8   | 0.03
(6.825)  7 |    6   | 0.82
(9.31 )  9 |   10   | 0.69
(8.045)  8 |    7   | 1.04
(6.2  )  6 |    6   | 0.2
(6.837)  7 |    6   | 0.84
(6.377)  6 |    6   | 0.38
(6.551)  7 |    6   | 0.55
(7.975)  8 |    8   | 0.02
(8.951)  9 |   10   | 1.05
(6.215)  6 |    5   | 1.22
(5.243)  5 |    6   | 0.76


In [48]:
# Export
sklearn2pmml(pmmlPipeline, "Edda-MLDP.pmml")