In [1]:
import json
from datetime import datetime, timedelta, timezone

import pandas as pd
import requests

from common import data_folder, stats_folder


In [2]:
# 获取企鹅物流数据

item_url = "https://penguin-stats.io/PenguinStats/api/v2/items"
matrix_url = "https://penguin-stats.io/PenguinStats/api/v2/result/matrix?show_closed_zones=true"
stage_url = "https://penguin-stats.io/PenguinStats/api/v2/stages"
zone_url = "https://penguin-stats.io/PenguinStats/api/v2/zones"


def get_data(url, path):
    try:
        with open(path, "r", encoding="utf-8") as fp:
            data = json.load(fp)
    except Exception:
        with requests.get(url) as response:
            data = response.json()
        with open(path, "w", encoding="utf-8") as fp:
            json.dump(data, fp, ensure_ascii=False)
    return data

item_data = get_data(item_url, stats_folder / "items.json")
matrix_data = get_data(matrix_url, stats_folder / "matrix.json")
stage_data = get_data(stage_url, stats_folder / "stages.json")
zone_data = get_data(zone_url, stats_folder / "zones.json")


In [3]:
item_dict = {item_info["itemId"]: item_info for item_info in item_data}
stage_dict = {stage_info["stageId"]: stage_info for stage_info in stage_data}
zone_dict = {zone_info["zoneId"]: zone_info for zone_info in zone_data}


In [4]:
def get_item_type(item_id):
    return item_dict[item_id]["itemType"]


def get_item_name(item_id):
    return item_dict[item_id]["name"]


def get_item_rarity(item_id):
    return item_dict[item_id]["rarity"]


def get_item_id_by_name(item_name):
    for item_id, item_info in item_dict.items():
        if item_info["name"] == item_name:
            return item_id
    else:
        raise ValueError(f"Item name {item_name} not found.")


def get_stage_name(stage_id):
    return stage_dict[stage_id]["code"]


def get_stage_open_timestamp(stage_id, server):
    return stage_dict[stage_id]["existence"][server]["openTime"]


TIMEZONE_OFFSET_HOURS_DICT = {
    "CN": +8,
    "US": -5,
    "JP": +9,
    "KR": +9,
}


def get_timezone(server):
    return timezone(timedelta(hours=TIMEZONE_OFFSET_HOURS_DICT[server.upper()]))


def get_stage_type(stage_id):
    return stage_dict[stage_id]["stageType"]


def get_stage_zone_id(stage_id):
    return stage_dict[stage_id]["zoneId"]


def get_stage_open_time(stage_id, server):
    return datetime.fromtimestamp(get_stage_open_timestamp(stage_id, server) / 1000).astimezone(get_timezone(server))


def get_stage_ap_cost(stage_id):
    return stage_dict[stage_id]["apCost"]


def get_zone_name(zone_id):
    return zone_dict[zone_id]["zoneName"]


In [5]:
# 稀疏矩阵改为按作战存储

server = "CN"

stage_drop_info = {stage_id: {} for stage_id in stage_dict}
for element in matrix_data["matrix"]:
    stage_id = element["stageId"]
    item_id = element["itemId"]
    times = element["times"]
    quantity = element["quantity"]
    assert item_id not in stage_drop_info[stage_id]
    stage_drop_info[stage_id][item_id] = (quantity, times)


In [6]:
# 掉单一蓝材料的活动

records = []
for stage_id, drop_info in stage_drop_info.items():
    if get_stage_type(stage_id) != "ACTIVITY":
        continue

    drop_info_filtered = {
        item_id: (quantity, times)
        for item_id, (quantity, times) in drop_info.items()
        if get_item_type(item_id) == "MATERIAL"}
    if len(drop_info_filtered) != 1:
        continue

    stage_name = get_stage_name(stage_id)
    zone_name = get_zone_name(get_stage_zone_id(stage_id))
    stage_open_time = get_stage_open_time(stage_id, server)
    ap_cost = get_stage_ap_cost(stage_id)
    (item_id, (quantity, times)), = drop_info_filtered.items()
    item_name = get_item_name(item_id)
    if get_item_rarity(item_id) != 2:
        continue

    作战掉落物品数量 = quantity / times
    单位理智掉落物品数量 = 作战掉落物品数量 / ap_cost
    单件期望理智 = 1 / 单位理智掉落物品数量

    records.append({
        "作战名称": stage_name,
        "活动名称": zone_name,
        "作战开放时间": stage_open_time,
        "作战理智消耗": ap_cost,
        "作战掉落物品名称": item_name,
        "掉落数": quantity,
        "样本数": times,
        "作战掉落物品数量": 作战掉落物品数量,
        "单位理智掉落物品数量": 单位理智掉落物品数量,
        "单件期望理智": 单件期望理智,
    })

df_T3 = pd.DataFrame.from_records(records)
df_T3.to_csv(data_folder / "掉单一蓝材料的活动.csv")


In [7]:
# 掉两种绿材料的活动

def sort_key(drop_info_item):
    byproduct_weight = {
        "30012": 15,
        "30022": 10,
        "30032": 10,
        "30042": 8,
        "30052": 8,
        "30062": 6,
    }
    item_id, (quantity, times) = drop_info_item
    return quantity / byproduct_weight.get(item_id, 1)


records = []
for stage_id, drop_info in stage_drop_info.items():
    if get_stage_type(stage_id) != "ACTIVITY":
        continue

    drop_info_filtered = {
        item_id: (quantity, times)
        for item_id, (quantity, times) in drop_info.items()
        if get_item_type(item_id) == "MATERIAL"}
    if len(drop_info_filtered) != 2:
        continue

    drop_info_filtered_items = list(drop_info_filtered.items())
    if get_item_rarity(drop_info_filtered_items[0][0]) != 1 or get_item_rarity(drop_info_filtered_items[1][0]) != 1:
        continue

    stage_name = get_stage_name(stage_id)
    zone_name = get_zone_name(get_stage_zone_id(stage_id))
    stage_open_time = get_stage_open_time(stage_id, server)
    ap_cost = get_stage_ap_cost(stage_id)
    (item_id_0, (quantity_0, times_0)), (item_id_1, (quantity_1, times_1)) = sorted(
        drop_info_filtered_items, key=sort_key, reverse=True)
    item_name_0 = get_item_name(item_id_0)
    item_name_1 = get_item_name(item_id_1)
    assert times_0 == times_1

    单次作战主掉落数量 = quantity_0 / times_0
    单位理智主掉落数量 = 单次作战主掉落数量 / ap_cost
    主掉落单件期望理智 = 1 / 单位理智主掉落数量
    单次作战副掉落数量 = quantity_1 / times_1
    单位理智副掉落数量 = 单次作战副掉落数量 / ap_cost
    副掉落单件期望理智 = 1 / 单位理智副掉落数量

    records.append({
        "作战名称": stage_name,
        "活动名称": zone_name,
        "作战开放时间": stage_open_time,
        "作战理智消耗": ap_cost,
        "样本数": times_0,
        "主掉落物品名称": item_name_0,
        "主掉落数": quantity_0,
        "单次作战主掉落数量": 单次作战主掉落数量,
        "单位理智主掉落数量": 单位理智主掉落数量,
        "主掉落单件期望理智": 主掉落单件期望理智,
        "副掉落物品名称": item_name_1,
        "副掉落数": quantity_1,
        "单次作战副掉落数量": 单次作战副掉落数量,
        "单位理智副掉落数量": 单位理智副掉落数量,
        "副掉落单件期望理智": 副掉落单件期望理智,
    })

df_T2 = pd.DataFrame.from_records(records)
df_T2.to_csv(data_folder / "掉两种绿材料的活动.csv")


In [8]:
# 掉白材料的活动

records = []
for stage_id, drop_info in stage_drop_info.items():
    if get_stage_type(stage_id) != "ACTIVITY":
        continue

    drop_info_filtered = {
        item_id: (quantity, times)
        for item_id, (quantity, times) in drop_info.items()
        if get_item_type(item_id) == "MATERIAL"}
    if len(drop_info_filtered) != 6:
        continue

    stage_name = get_stage_name(stage_id)
    zone_name = get_zone_name(get_stage_zone_id(stage_id))
    stage_open_time = get_stage_open_time(stage_id, server)
    ap_cost = get_stage_ap_cost(stage_id)

    if not set(drop_info_filtered) == {"30011", "30021", "30031", "30041", "30051", "30061"}:
        continue
    times = drop_info_filtered["30011"][1]
    assert all(times == v[1] for v in drop_info_filtered.values())

    records.append({
        "作战名称": stage_name,
        "活动名称": zone_name,
        "作战开放时间": stage_open_time,
        "作战理智消耗": ap_cost,
        "样本数": times,
        "源岩掉落数": drop_info_filtered["30011"][0],
        "源岩单件期望理智": (times * ap_cost) / drop_info_filtered["30011"][0],
        "代糖掉落数": drop_info_filtered["30021"][0],
        "代糖单件期望理智": (times * ap_cost) / drop_info_filtered["30021"][0],
        "酯原料掉落数": drop_info_filtered["30031"][0],
        "酯原料单件期望理智": (times * ap_cost) / drop_info_filtered["30031"][0],
        "异铁碎片掉落数": drop_info_filtered["30041"][0],
        "异铁碎片单件期望理智": (times * ap_cost) / drop_info_filtered["30041"][0],
        "双酮掉落数": drop_info_filtered["30051"][0],
        "双酮单件期望理智": (times * ap_cost) / drop_info_filtered["30051"][0],
        "破损装置掉落数": drop_info_filtered["30061"][0],
        "破损装置单件期望理智": (times * ap_cost) / drop_info_filtered["30061"][0],
    })

df_T1 = pd.DataFrame.from_records(records)
df_T1.to_csv(data_folder / "掉全部白材料的活动.csv")
