# Osu taiko playstyle analysis

In [None]:
import os
os.environ['OSUAPIv2_APP_CALLBACK']

In [None]:
# Osu API
from ossapi import *
import os
api = Ossapi(int(os.environ['OSUAPIv2_APP_ID']), os.environ['OSUAPIv2_APP_SECRET'], os.environ['OSUAPIv2_APP_CALLBACK'])
# apiv1 = Ossapi(os.environ['OSUAPIv1_KEY'])

In [None]:
liemap = api.beatmap(1921771)
print(liemap)

In [None]:
# lie = api.beatmap_scores(beatmap_id=1921771, mode="taiko")
# print(lie)

# liev1 = apiv1.get_scores(1921771, mode=1, mods=0, limit=100)
# print(liev1[0].replay_id)
# apiv1.get_replay(beatmap_id = 1921771, user = liev1[0].user_id)

def download_rep(replayID, name = None):
    if name is None:
        name = replayID
    rep = api.session.get(f"{api.BASE_URL}/scores/taiko/{replayID}/download")
    with open(f"replays/{name}.osr", "wb") as f:
        f.write(rep.content)
        
download_rep(168691149)

In [None]:
import numpy as np
from osrparse import Replay, KeyTaiko
from matplotlib import pyplot as plt

rep = Replay.from_path("Gra - fhana - Niji o Ametara (TV Size) [Chromoxx' Inner Oni] (2023-11-17) Taiko.osr")
# rep = parse_replay_file("replays/Edward_Tsui - Hatsune Miku - Talent Shredder [Firce Force] (2021-12-08) Taiko.osr")
# rep = parse_replay_file("replays/Lubei_01 - antiPLUR - Speed of Link [Special] (2021-10-09) Taiko.osr")
BPM = 200

print(60000 / (BPM * 4)) # 1/4s
print(60000 / (BPM * 2)) # 1/2s
slowThreshold = 60000 / (BPM * 4)

for i in range(4):
    print(f"{i} = {str(KeyTaiko(1 << i))}")
    
def swapLdk(arr):
    arr[[0, 1]] = arr[[1, 0]]
    arr[:, [0, 1]] = arr[:, [1, 0]]
    return arr

def normKD_old(arr):
    sums = arr.sum(axis = 0)
    sum_dons = sums[1] + sums[2]
    sum_kats = sums[0] + sums[3]
    
    if sum_dons == 0:
        sum_dons = sum_kats
    elif sum_kats == 0:
        sum_kats = sum_dons
        
    if not (sum_dons > 0 and sum_kats > 0):
        return arr

    arr = np.copy(arr)
    
    arr[:, 0] /= sum_kats
    arr[:, 1] /= sum_dons
    arr[:, 2] /= sum_dons
    arr[:, 3] /= sum_kats
    return arr

dds = np.array([[1,1], [2,1], [1,2], [2,2]])
dks = np.array([[1,0], [2,0], [1,3], [2,3]])
kks = np.array([[0,0], [0,3], [3,0], [3,3]])
kds = np.array([[0,1], [0,2], [3,1], [3,2]])
entries = np.stack([dds, dks, kks, kds], axis = 0)

def take_2d_entries(arr, ix):
    return arr[ix[..., 0], ix[..., 1]]

def normKD(arr):
    sums = take_2d_entries(arr, entries).sum(axis = -1, keepdims = True)

    if np.sum(sums) == 0:
        return arr
    
    min_non_zero = np.min(sums[sums > 0])
    sums[sums == 0] = min_non_zero
    
    arr = np.copy(arr)
    
#     print(arr)
    arr[entries[..., 0], entries[..., 1]] /= sums

#     print("Normalized:")
#     print(arr)
    
    return arr

def get_features(rep, slowThreshold):
    
    pressed = KeyTaiko(0)
    timestamp = 0 # Time since previous hit
    prevHit = KeyTaiko(0)

    slow_mat = np.zeros((4,4))
    fast_mat = np.zeros((4,4))

    for hit in rep.replay_data:
        newKeys = hit.keys & ~pressed
        pressed = hit.keys
        timestamp += hit.time_delta

        # New hits
        if newKeys != 0:

            # Assign to the array
            prev = np.array([i for i in range(4) if (((1 << i) & prevHit) > 0)], np.int8)
            curr = np.array([i for i in range(4) if (((1 << i) & newKeys) > 0)], np.int8)
            xx, yy = np.meshgrid(prev, curr)

            if timestamp <= slowThreshold:
                fast_mat[xx.flatten(), yy.flatten()] += 1
            else:
                slow_mat[xx.flatten(), yy.flatten()] += 1

            prevHit = newKeys
            timestamp = 0
    
    fast_mat = swapLdk(fast_mat)
    slow_mat = swapLdk(slow_mat)
    
    raw = [fast_mat, slow_mat]
    
    fast_mat = normKD(fast_mat)
    slow_mat = normKD(slow_mat)
    
    fast_norm = fast_mat / max(1, np.sum(fast_mat))
    slow_norm = slow_mat / max(1, np.sum(slow_mat))
    feat = np.stack([fast_norm, slow_norm], axis = 0).astype(np.float64)
#     feat = fast_mat
#     feat = feat / np.sum(feat)
    
    return feat, raw

# download_rep(152963984, "152963984")
# rep = parse_replay_file("replays/152963984.osr")
feat, [fast, slow] = get_features(rep, slowThreshold)
print(feat)

print("\nfast:")
print(fast)
# plt.matshow(fast)
plt.matshow(feat[0])
plt.show()
print("\nslow:")
print(slow)
# plt.matshow(slow)
plt.matshow(feat[1])
plt.show()

In [None]:
from osrparse import parse_replay_data

In [None]:
def GetRanking(count = 100, mode = "taiko", country = None):
    
    cnt = 0
    cursor = None
    result = []
    
    while cnt < count:
        r = api.ranking(mode, RankingType.PERFORMANCE, country = country, cursor = cursor)
        cursor = r.cursor
        cnt += len(r.ranking)
        for entry in r.ranking:
            result.append(entry.user)
    
    return result

In [None]:
top1K5 = GetRanking(1500)
CN100 = GetRanking(100, country = "CN")
uniqueIDs = list(set([u.id for u in top1K5] + [u.id for u in CN100]))

In [None]:
uniqueIDs

In [None]:
import time

# Collect replays
TargetPlayers = [
#     1558839, # betairylia
#     11692528,# Lubei
#     6764344, # Nepoch
#     10324309,# Flandre sca
#     8609627, # Edward
#     10507557,# Hynix
#     3383404, # Rin
#     2221895, # Wonberman
#     12208924,# OvO
#     6443264, # acst
#     9603470, # Yusyou
#     4314222, # Kafuu Cirno
#     609536,  # lzx
    
#     2349769, # kk
#     4315477, # sSSS
#     5310623, # Blastix
#     6294200, # te
#     2073644, # Risona
#     810813,  # HM
    6844521, # szh134
    9383908, # shoucan
    4112195, # 7j
    12480076,# Michael
    11341131,# NaNa
    8263525, # smf
    81972,   # Mic
    15917084,# Apricot L
    12703319,# Minato
    12749779,# Lilia Angel
    16903694,# Pinkiemane
    
    8741695, # syaron
    983349,  # applerss
    6170507, # yu68
    5321719, # uone
    165027,  # Peaceful
    1152851, # Bamgoe
    9912966, # Zeth
    9503098, # hoku
    9864847, # Botched
    7955738, # CL
    13302996,# Boaz
    2865172, # Bries
    11117835,# Quass
    9856910, # Gamelan
    8772103, # nuku
    7740442, # Spartric
    6632605, # Megafan
    14005209,# aquachan
    12611862,# diamond
    933630,  # fanhoho
    6498810, # Kerasi
    3867109, # Maou
    5054081, # rai
    12248285,# Storm
    10530606,# Neon
    10694200,# KD
    15541593,# Dau
    1751879, # shiroino
    11936599,# Quibby
    8770622, # ITGT
    1100246, # DefiantJ
    
    6177263, # mp kick Kasumi-sama
]

def GrabFromBP(uid, amount):
    print("Grab BP maps from uid %d (%s)" % (uid, api.user(uid).username))
    best = api.user_scores(uid, "best", mode = "taiko")[:amount]
    print(best)
    for b in best:
        print(b)
        scoreid = b.id
#         print(b.user().username)
#         print(b.beatmapset.artist)
#         print(b.beatmap.version)
        download_rep(scoreid, f"{b.user().username} - {b.id}")
        time.sleep(15)

In [None]:
# GrabFromBP(4669728, 5)

In [None]:
from tqdm.notebook import tqdm
# GrabFromBP(TargetPlayers[0], 1)

# for uid in tqdm(uniqueIDs):
#     GrabFromBP(uid, 2)

In [None]:
from osrparse import Replay
# import multiprocessing

# mpN = 4
# pool = multiprocessing.Pool(mpN)

# Collect features for all replays in folder
features = []
cnt = 0

# def to_feats(file):
#     if ".osr" in file:
#         rep = Replay.from_path(os.path.join("replays/", file))
#         feat, [raw_fast, raw_slow] = get_features(rep, slowThreshold)
#         return (feat, {'Player': rep.username, 'fast': raw_fast, 'slow': raw_slow, 'file': file})
#     return None

# all_files = list(os.listdir("replays/"))
# all_files = all_files[:10]
# features = pool.map(to_feats, all_files)

for file in tqdm(os.listdir("replays/")):
    if cnt > 1000:
        break
    if ".osr" in file:
        cnt += 1
        rep = Replay.from_path(os.path.join("replays/", file))
        feat, [raw_fast, raw_slow] = get_features(rep, slowThreshold)
        features.append((feat, {'Player': rep.username, 'fast': raw_fast, 'slow': raw_slow, 'file': file}))
        
# print(features)

In [None]:
import umap
import sklearn

reducer = umap.UMAP()
# reducer = sklearn.decomposition.PCA(n_components = 2)

In [None]:
# import pickle
# pickle.dump(features, open("test-features.pkl", "wb"))

In [None]:
Xs = [f[0].flatten() for f in features]
Ys = [f[1] for f in features]
emb = reducer.fit_transform(Xs)

In [None]:
import scipy

# https://stackoverflow.com/questions/63812970/scipy-gaussian-kde-matrix-is-not-positive-definite
class GaussianKde(scipy.stats.gaussian_kde):
    """
    Drop-in replacement for gaussian_kde that adds the class attribute EPSILON
    to the covmat eigenvalues, to prevent exceptions due to numerical error.
    """

    EPSILON = 1e-10  # adjust this at will

    def _compute_covariance(self):
        """Computes the covariance matrix for each Gaussian kernel using
        covariance_factor().
        """
        self.factor = self.covariance_factor()
        # Cache covariance and inverse covariance of the data
        if not hasattr(self, '_data_inv_cov'):
            self._data_covariance = np.atleast_2d(np.cov(self.dataset, rowvar=1,
                                                         bias=False,
                                                         aweights=self.weights))
            # we're going the easy way here
            self._data_covariance += self.EPSILON * np.eye(
                len(self._data_covariance))
            self._data_inv_cov = np.linalg.inv(self._data_covariance)

        self.covariance = self._data_covariance * self.factor**2
        self.inv_cov = self._data_inv_cov / self.factor**2
        L = np.linalg.cholesky(self.covariance * 2 * np.pi)
        self._norm_factor = 2*np.log(np.diag(L)).sum()  # needed for scipy 1.5.2
        self.log_det = 2*np.log(np.diag(L)).sum()  # changed var name on 1.6.2

Xs_KDE = np.transpose(np.array(Xs))
print(Xs_KDE.shape)
kernel = GaussianKde(np.unique(Xs_KDE, axis = 1))
# kernel = scipy.stats.gaussian_kde(np.unique(Xs_KDE, axis = 1))

In [None]:
import matplotlib.pyplot as plt

KDEresult = kernel(Xs_KDE)

cb = plt.scatter(emb[:, 0], emb[:, 1], c = KDEresult)
plt.colorbar(cb)

# Numerical issues ?

In [None]:
import matplotlib, random

hex_colors_dic = {}
rgb_colors_dic = {}
hex_colors_only = []
for name, hex in matplotlib.colors.cnames.items():
    hex_colors_only.append(hex)
    hex_colors_dic[name] = hex
    rgb_colors_dic[name] = matplotlib.colors.to_rgb(hex)

print(hex_colors_only)

In [None]:
from PIL import Image
from matplotlib import cm
import base64
from io import BytesIO

cmap = plt.get_cmap("viridis")

def getBase64(arr):
    test = arr
    test = (test / (np.max(test) - np.min(test)))
    test = cmap(test)
    im = Image.fromarray(np.uint8(test * 255))

    buffered = BytesIO()
    im.save(buffered, format="PNG")
    img_str = base64.b64encode(buffered.getvalue())
    return str(img_str)[2:-1]

In [None]:
# plotly
import plotly.express as px
import pandas as pd

data = {
    'UMAP-x': emb[:, 0], 
    'UMAP-y': emb[:, 1], 
    'Player': [Y['Player'] for Y in Ys],
    'Player-short': [Y['Player'][:5] for Y in Ys],
    'Color': [hex_colors_only[hash(Y['Player']) % len(hex_colors_only)] for Y in Ys],
    'file': [Y['file'] for Y in Ys],
    'KDE': [KDEresult[i] for i in range(len(Ys))],
#     'fast': [f"\n{Y['fast']}" for Y in Ys],
#     'slow': [f"\n{Y['slow']}" for Y in Ys],
    'fast': [getBase64(Y['fast']) for Y in Ys],
    'slow': [getBase64(Y['slow']) for Y in Ys],
}
df = pd.DataFrame(data)

# fig = px.scatter(df, x = 'UMAP-x', y = 'UMAP-y', hover_name = 'Player', text = 'Player-short', color = 'Player', hover_data = ['fast', 'slow', 'file'])
# fig = px.scatter(df, x = 'UMAP-x', y = 'UMAP-y', hover_name = 'Player', color = 'Player', hover_data = ['fast', 'slow', 'file'])
# fig.update_traces(textposition='top center')
# fig.show()

In [None]:
df.to_csv("test_new_dkTnorm.csv")