In [19]:
cosmos = False

import asyncio
import requests
import csv
from tqdm.notebook import tqdm_notebook

if cosmos:
    print("Using Cosmos to Store Data")
    from azure.cosmos import CosmosClient, PartitionKey
    ENDPOINT = ""
    KEY = ""
    DATABASE_NAME = "mythic_data"
    CONTAINER_NAME = "runs"
else:
    print("Storing data to CSV")

Storing data to CSV


In [17]:
# Set up helper functions for retrieving data
async def retrieve_data(pages_to_sample, store_to_cosmos=False, cosmos_container=None):
    leaderboard_base_uri = "https://raider.io/api/v1/mythic-plus/runs"
    run_details_base_uri = "https://raider.io/api/v1/mythic-plus/run-details"
    season = "season-df-2"
    region = "world"
    dungeon = "all"
    affixes = "all"

    rankings = []
    for i in tqdm_notebook(range(pages_to_sample)):
        leaderboard_params = {
            "season": season,
            "region": region,
            "dungeon": dungeon,
            "affixes": affixes,
            "page": i
        }
        leaderboard_data = requests.get(leaderboard_base_uri, params=leaderboard_params).json()
        for run_summary in leaderboard_data["rankings"]:
            run_id = run_summary["run"]["keystone_run_id"]
            run_params = {
                "id": run_id,
                "season": season
            }
            run_details = requests.get(run_details_base_uri, params=run_params).json()
            flattened_data = flatten_data(run_summary, run_details)
            if store_to_cosmos:
                await cosmos_container.upsert_item(flattened_data)
            else:
                rankings.append(flattened_data)
        # Requests are limited to 300 per minute, so delay each loop slightly.
        await asyncio.sleep(1)
    return rankings

# Flatten Data
def flatten_data(run_summary, run_details):
    flattened_ranking = {
        "id": run_summary["run"]["keystone_run_id"],
        "score": run_summary["score"],
        "dungeon_name": run_summary["run"]["dungeon"]["name"],
        "dungeon_num_bosses": run_summary["run"]["dungeon"]["num_bosses"],
        "mythic_level": run_summary["run"]["mythic_level"],
        "clear_time_ms": run_summary["run"]["clear_time_ms"],
        "keystone_time_ms": run_summary["run"]["keystone_time_ms"],
        "weekly_modifiers": [i["name"] for i in run_summary["run"]["weekly_modifiers"]]
    }
    
    dps_count = 1
    for i in run_details["roster"]:
        role = i["role"]
        class_spec = f"{i['character']['class']['name']}-{i['character']['spec']['name']}"
        race = i["character"]["race"]["name"]
        level = i["character"]["level"]
        item_level = i["items"]["item_level_equipped"]
        score = i["ranks"]["score"]
        if role == "tank":
            tank_details = {
                "tank_class": class_spec,
                "tank_race": race,
                "tank_level": level,
                "tank_item_level": item_level,
                "tank_score": score,
                "region": i["character"]["region"]["name"]
            }
            flattened_ranking.update(tank_details)
        if role == "healer":
            healer_details = {
                "healer_class": class_spec,
                "healer_race": race,
                "healer_level": level,
                "healer_item_level": item_level,
                "healer_score": score
            }
            flattened_ranking.update(healer_details)
        if role == "dps":
            dps_details = {
                f"dps_{dps_count}_class": class_spec,
                f"dps_{dps_count}_race": race,
                f"dps_{dps_count}_level": level,
                f"dps_{dps_count}_item_level": item_level,
                f"dps_{dps_count}_score": score
            }
            dps_count += 1
            flattened_ranking.update(dps_details)
    return flattened_ranking

def write_data(data, filename):
    headers = list(data[0].keys())
    with open(filename, 'w', newline='') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=headers)
        writer.writeheader()
        writer.writerows(data)

In [None]:
# Cosmos helpers and setup
def create_cosmos_database_and_container(client):
    database = client.create_database_if_not_exists(id=DATABASE_NAME)
    container = database.create_container_if_not_exists(
        id=CONTAINER_NAME, offer_throughput=400
    )
    print("Database:\t", database.id)
    print("Container:\t", container.id)
    return container

def translate_cosmos_to_csv(container, filename):
    items = list(container.read_all_items(max_item_count=100))
    headers = list(items[0].keys())
    print(f"Found {len(items)} runs to translate to CSV")
    with open(filename, 'w', newline='') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=headers)
        writer.writeheader()
        writer.writerows(items)

cosmos_client = CosmosClient(url=ENDPOINT, credential=KEY)
container = create_cosmos_database_and_container(cosmos_client)

In [18]:
# Retrieve all information about a sample of completed mythic runs.
if cosmos:
    pages = 3
    await retrieve_data(pages, store_to_cosmos=cosmos, cosmos_container=container)
else:
    pages = 3
    cosmos_client = None
    rankings = await retrieve_data(pages)
    write_data(rankings, filename)

  0%|          | 0/5 [00:00<?, ?it/s]

delay
delay
delay
delay
delay


  rankings = await retrieve_data(pages, store_to_cosmos=cosmos)


In [None]:
if cosmos:
    filename = f"data/rankings{pages}.csv"
    translate_cosmos_to_csv(cosmos_client, filename)
else:
    filename = f"data/rankings_sample.csv"
    write_data(rankings, filename)