### 0. Create a Trackmania server account

Use [this page on trackmania.com](https://www.trackmania.com/player/dedicated-servers) to generate a Trackmania API account. Then create a .env file in the project root containing:

```
TRACKMANIA_APP_ID=
TRACKMANIA_LOGIN=
TRACKMANIA_PASSWORD=
```

You will also need to `pip install -r requirements.txt` before proceeding.

In [None]:
from dotenv import load_dotenv
load_dotenv()

import os
class env:
    TRACKMANIA_APP_ID = os.environ['TRACKMANIA_APP_ID']
    TRACKMANIA_LOGIN = os.environ['TRACKMANIA_LOGIN']
    TRACKMANIA_PASSWORD = os.environ['TRACKMANIA_PASSWORD']

### 1. Set up local database

In [None]:
from sqlalchemy import create_engine, Engine, ForeignKey

def get_engine(echo:bool=True) -> Engine:
    return create_engine('sqlite:///sql.db', echo=echo)


from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

class Base(DeclarativeBase):
    pass

class Track(Base):
    __tablename__ = 'tracks'
    id: Mapped[str] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(nullable=False)
    def __repr__(self):
        return f"<<Track {self.name} ({self.id})>>"

class Player(Base):
    __tablename__ = 'players'
    id: Mapped[str] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(nullable=False)
    team: Mapped[str] = mapped_column(ForeignKey('teams.name'))
    def __repr__(self):
        return f"<<Player {self.name} ({self.id}) from {self.team.name}>>"
    
class Record(Base):
    __tablename__ = 'records'
    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    time: Mapped[float] = mapped_column(nullable=False)
    track: Mapped[str] = mapped_column(ForeignKey('tracks.id'))
    player: Mapped[str] = mapped_column(ForeignKey('players.id'))
    def __repr__(self):
        return f"<<Record {self.id}: {self.time} by {self.player} on {self.track}>>"

class Team(Base):
    __tablename__ = 'teams'
    name: Mapped[str] = mapped_column(primary_key=True)
    ranking: Mapped[int] = mapped_column(default=0)
    def __repr__(self):
        return f"<<Team {self.name} ({self.ranking})>>"
    def __hash__(self):
        return hash(repr(self))
    def __eq__(self, other):
        return self.name == other.name


engine = get_engine(echo=False)
# Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)

### 2. Load maps and players

In [None]:
from sqlalchemy.orm.session import Session

PIL_MAP_LIST = {
    'S-Tier': '4mZXDJgJSg2iSvx7BqyfgWhgYWg',
    'Telebanco': 'cXLNXd1sw_at9WZAse4I2jl3cAe',
    'Glacial Lure': 'dwzn5CGhBeSHCIiWFgLQjeoBrYf',
}

tracks = [Track(id=id, name=name) for name,id in PIL_MAP_LIST.items()]
with Session(get_engine(echo=False)) as session:
    session.add_all(tracks)
    session.commit()

from sqlalchemy import text as RAW_SQL
with Session(get_engine(echo=False)) as session:
    stmt = RAW_SQL('SELECT COUNT(*) FROM tracks;')
    res = session.execute(stmt).scalar_one()
    print(res)

In [None]:
import csv

players = []
teams = set()

with open('teams_cleaned.csv', 'r') as f:
    reader = csv.reader(f)
    for r,row in enumerate(reader):
        team,username,account_id = row
        teams.add(Team(name=team))
        players.append(Player(id=account_id, name=username, team=team))

with Session(get_engine(echo=False)) as session:
    session.add_all(list(teams))
    session.add_all(players)
    session.commit()

from sqlalchemy import text as RAW_SQL
with Session(get_engine(echo=False)) as session:
    stmt = RAW_SQL('SELECT COUNT(*) FROM players;')
    res = session.execute(stmt).scalar_one()
    print(res)

### 3. Load records from Trackmania API

In [None]:
import requests
from requests.auth import HTTPBasicAuth

AUTH_BASE_URL = 'https://prod.trackmania.core.nadeo.online/v2/authentication/token'

ACCESS_TOKEN, REFRESH_TOKEN = None, None
def nadeo_authenticate(audience:str='') -> None:
    global ACCESS_TOKEN, REFRESH_TOKEN
    # audience may be empty, "club", or "live"
    if not REFRESH_TOKEN:
        url = f"{AUTH_BASE_URL}/basic"
        headers = {}
        data = {'audience' : f"Nadeo{audience.capitalize()}Services"}
        auth = HTTPBasicAuth(env.TRACKMANIA_LOGIN, env.TRACKMANIA_PASSWORD)
    else:
        url = f"{AUTH_BASE_URL}/refresh"
        headers = {'Authorization' : f"nadeo_v1 t={REFRESH_TOKEN}"}
        data = {}
        auth = None
    resp = requests.post(url=url, data=data, headers=headers, auth=auth)
    resp.raise_for_status()
    j = resp.json()
    ACCESS_TOKEN, REFRESH_TOKEN = j['accessToken'], j['refreshToken']

In [None]:
from sqlalchemy import text as RAW_SQL
with Session(get_engine(echo=False)) as session:
    stmt = RAW_SQL('SELECT id FROM tracks;')
    track_ids = session.execute(stmt).scalars()


import requests
nadeo_authenticate(audience='live')
headers = {'Authorization':f"nadeo_v1 t={ACCESS_TOKEN}"}
GROUP_UID = 'Personal_Best'

track_records = []
length = 100
for track_uid in track_ids:
    offset = 0
    lb_chunk = [0]
    while len(lb_chunk) > 0:
        lb_chunk = []
        url = f"https://live-services.trackmania.nadeo.live/api/token/leaderboard/group/{GROUP_UID}/map/{track_uid}/top?onlyWorld=true&length={length}&offset={offset}"
        resp = requests.get(url=url, headers=headers)
        j = resp.json()
        for res in j['tops'][0]['top']:
            lb_chunk.append(Record(time=res['score']/1000, track=track_uid, player=res['accountId']))
        track_records.extend(lb_chunk)
        offset += length


with Session(get_engine(echo=False)) as session:
    session.add_all(track_records)
    session.commit()

from sqlalchemy import text as RAW_SQL
with Session(get_engine(echo=False)) as session:
    stmt = RAW_SQL('SELECT COUNT(*) FROM records;')
    res = session.execute(stmt).scalar_one()
    print(res)

### 4. Determine team rankings based on players' ranks on each map

In [None]:
sql = """
WITH
times AS (
    SELECT *
    FROM (
        SELECT
            p.name AS player_name
            ,t.name AS team_name
            ,k.name AS track_name
            ,r.time
            ,DENSE_RANK() OVER (PARTITION BY t.name, k.name ORDER BY r.time, t.name) AS player_track_rank_within_team
            ,DENSE_RANK() OVER (PARTITION BY k.name ORDER BY r.time, p.name) AS player_track_rank_overall
        FROM records AS r
        INNER JOIN tracks AS k
            ON k.id = r.track
        LEFT JOIN players AS p
            ON r.player = p.id
        LEFT JOIN teams AS t
            ON p.team = t.name
        ORDER BY
            k.name
            ,t.name
            ,player_track_rank_within_team
    )
    WHERE team_name IS NOT NULL
)
,reduced_times AS (
    SELECT
        track_name
        ,team_name
        ,player_name
        ,time
        ,player_track_rank_overall
    FROM times
    WHERE player_track_rank_within_team <= 2
)
,team_averages AS (
    SELECT
        track_name
        ,team_name
        ,AVG(time) AS avg_time
    FROM times
    GROUP BY 
        track_name
        ,team_name
)
SELECT
    r.track_name
    ,r.team_name
    ,r.player_name
    ,r.time
    ,a.avg_time
    ,DENSE_RANK() OVER (PARTITION BY r.track_name ORDER BY a.avg_time, r.team_name) AS team_track_rank
    ,r.player_track_rank_overall
FROM reduced_times AS r
INNER JOIN team_averages AS a
    ON r.track_name = a.track_name
    AND r.team_name = a.team_name
ORDER BY
    r.track_name
    ,team_track_rank
    ,r.player_track_rank_overall
"""

from sqlalchemy import text as RAW_SQL
with Session(get_engine(echo=False)) as session:
    stmt = RAW_SQL(sql)
    res = session.execute(stmt).all()
    for row in res:
        print(row)

### 5. Save results to local CSV for further analysis

In [None]:
import csv
with open('results.csv', 'w') as f:
    writer = csv.writer(f, lineterminator='\n')
    writer.writerows(res)