In [None]:
from ossapi import Ossapi, Scope, Beatmap, User, Replay, UserCompact,BeatmapUserScore
import os
from typing import List, Dict
from tqdm import tqdm
import time
from config import config

client_id = config.get_api_key('osu_id')
client_secret = config.get_api_key('osu_secret')
callback_url = "http://localhost:3914/"

scopes = [Scope.PUBLIC, Scope.FRIENDS_READ]

api = Ossapi(client_id=client_id, client_secret=client_secret, redirect_uri=callback_url, scopes=scopes)



In [None]:
search = api.search_beatmapsets(mode=0,category='ranked',force_featured_artists=False,force_recommended_difficulty=False,sort='ranked_desc')

In [None]:
import time
from datetime import datetime, timezone
import random
import requests

def get_maps_in_star_range(sets, min, max):
    
    out_maps = []
    
    for set in sets:
        for map in set.beatmaps:
            if (map.difficulty_rating >= min and 
                map.difficulty_rating <= max and 
                map.playcount > 50 and 
                set.ranked_date.replace(tzinfo=timezone.utc).timestamp() > time.time() - 7*24*60*60):
                out_maps.append(map)
                
    return out_maps

def get_random_replay_from_scores(scores:List[BeatmapUserScore]):
    
    # remove ss scores
    scores = [score for score in scores if score.rank.name not in ['X', 'XH', 'SS', 'SSH', 'SH', 'S','PFS','PF']]
    
    #score_samples = scores[-20:]
    #score_samples += scores[5:10]
    random.shuffle(scores)
    for score in scores:
        if score.replay is not None:
            replay:Replay = api.download_score_mode(mode='osu', score_id=score.id)
            os.makedirs('replays', exist_ok=True)
            with open(f'replays/{score.id}.osr', 'wb') as f:
                f.write(replay.pack())
            return replay, f"replays/{score.id}.osr"
                
    return None

def download_beatmap(beatmapset_id, no_hitsound=False, no_storyboard=False, no_bg=False, no_video=True):
    
    #check if the map is already downloaded
    if os.path.exists(f'maps/{beatmapset_id}'):
        return f'maps/{beatmapset_id}.osz'
    
    base_url = "https://api.nerinyan.moe/d"
    params = {
        "nh": str(no_hitsound).lower(),
        "nsb": str(no_storyboard).lower(),
        "nb": str(no_bg).lower(),
        "nv": str(no_video).lower()
    }
    url = f"{base_url}/{beatmapset_id}"
    try:
        response = requests.get(url, params=params, stream=True)

        if response.status_code == 200:
            with open(f'maps/{beatmapset_id}.osz', "wb") as f:
                f.write(response.content)
            return f'maps/{beatmapset_id}.osz'

    except requests.exceptions.RequestException as e:
        print(f"An error occurred while downloading the beatmap: {e}")
        return None


def get_random_valid_map(maps):
    random.shuffle(maps)
    for map in maps:
        path = download_beatmap(map.beatmapset_id)
        if path is not None:
            return path, map
    
    return None, None

import subprocess
import json
import time

def run_command(command):
    process = subprocess.Popen(command, stdout=subprocess.PIPE)
    last_line = time.time()
    while last_line + 2 > time.time():
        output = process.stdout.readline()
        if output == '' and process.poll() is not None:
            break
        if output:
            #print(output.strip())
            last_line = time.time()
    rc = process.poll()
    return rc



maps = get_maps_in_star_range(search.beatmapsets, 4.5, 10)


In [None]:
import json
replay_time = 30
for i in tqdm(range(len(maps) * 6)):
    try:
        path, map = get_random_valid_map(maps)
        scores = api.beatmap_scores(map.id)
        good_replays, osr = get_random_replay_from_scores(scores.scores)
        player_rank = good_replays.user.expand().rank_history.data[-1]
        
        map_len = good_replays.beatmap.total_length
        
        start = random.randint(0, max(map_len - replay_time, 5))
        end = start + replay_time
        
        
        if "HT" in str(good_replays.mods):
            speed = 0.75
        if 'DT' in str(good_replays.mods):
            speed = 1.5
        if 'NC' in str(good_replays.mods):
            speed = 1.5
            pitch = 1.25
        else:
            speed = 1
            pitch = 1
            
        
        cmd = f'danser/danser-cli.exe -quickstart -settings=Replay_watcher -record -r={osr} -out={player_rank} -skip -start={start} -end={end} -speed={speed} -pitch={pitch}'
        
        medatada = {
            "map_id": good_replays.beatmap.id,
            "player_id": good_replays.user.id,
        }
        
        with open(f'danser-0.9.1-win/videos/{player_rank}.json', 'w') as f:
            json.dump(medatada, f, indent=4)
            
        run_command(cmd)
    except Exception as e:
        print(f"An error occurred: {e}")
        continue
    