In [None]:
# def get_weights_in_window(hist_df_list_seg: List[pd.DataFrame], date_list_seg: List[pd.Timestamp], top_tickers: List[str]) -> pd.DataFrame:
#     assert len(hist_df_list_seg) == len(date_list_seg)
#     # init
#     df_weights = pd.DataFrame(
#         {
#             "date": pd.Series(dtype='datetime64[ns]'),
#             **{f"ticker_{i}": pd.Series(dtype='str') for i in range(len(top_tickers))},
#             **{f"weight_{i}": pd.Series(dtype='float') for i in range(len(top_tickers))},
#         }
#     )
#     # iterate all time in a window 
#     for df, date in zip(hist_df_list_seg, date_list_seg):
#         len_df_weights = len(df_weights)
#         df_weights.at[len_df_weights, "date"] = date
#         # iterate all tickers
#         cap_sum = df["Market Cap"].sum()
#         df["Weight"] = df["Market Cap"] / cap_sum
#         tick_cnt = 0
#         for _, row in df.iterrows():
#             if row["Ticker"] in top_tickers:
#                 df_weights.at[len_df_weights, f"ticker_{tick_cnt}"] = row["Ticker"]
#                 df_weights.at[len_df_weights, f"weight_{tick_cnt}"] = row["Weight"]
#                 tick_cnt += 1
#     return df_weights


In [None]:
class TickerMapping:
    hist_to_price_dict = {"SHIB": "1000SHIB"}
    price_to_hist_dict = {"1000SHIB": "SHIB"}

    def hist_to_price(self, ticker: str) -> str:
        return self.hist_to_price_dict.get(ticker, ticker)

    def price_to_hist(self, ticker: str) -> str:
        return self.price_to_hist_dict.get(ticker, ticker)


def load_all_price_data(glob_path: str) -> Dict[str, pd.DataFrame]:
    ticker_mapping = TickerMapping()
    price_data_dict = {}
    for xlsx_path in glob.glob(glob_path):
        ticker = re.findall("(.*)USDT.xlsx", os.path.basename(xlsx_path))[0]
        ticker = ticker_mapping.price_to_hist(ticker)
        df = pd.read_excel(xlsx_path)
        price_data_dict[ticker] = df
    return price_data_dict


def get_and_save_extend_describe(df: pd.DataFrame, save_path: str) -> pd.DataFrame:
    df_describe = df.describe()
    df_describe = df_describe.loc[["min", "max"]]
    df_describe.loc["log_min"] = df_describe.loc["min"].apply(lambda x: np.log(x) if isinstance(x, np.float64) else None)
    df_describe.loc["log_max"] = df_describe.loc["max"].apply(lambda x: np.log(x) if isinstance(x, np.float64) else None)
    os.makedirs(os.path.dirname(save_path), exist_ok=True)
    df_describe.to_csv(save_path)
    return df_describe


@dataclass
class NormalizationMapper:
    describe_col: str
    method: str


map_dict_norm = {
    "open": NormalizationMapper(describe_col="open", method="norm"),
    "high": NormalizationMapper(describe_col="high", method="norm"),
    "low": NormalizationMapper(describe_col="low", method="norm"),
    "close": NormalizationMapper(describe_col="close", method="norm"),
    "volume": NormalizationMapper(describe_col="volume", method="log_norm"),
}
map_dict_denorm = {
    "open": NormalizationMapper(describe_col="open", method="denorm"),
    "high": NormalizationMapper(describe_col="high", method="denorm"),
    "low": NormalizationMapper(describe_col="low", method="denorm"),
    "close": NormalizationMapper(describe_col="close", method="denorm"),
    "volume": NormalizationMapper(describe_col="volume", method="log_denorm"),
}


class Normalizer:
    def __init__(self, describe: pd.DataFrame = None):
        self.des = describe

    def _norm(self, col: str, ser: pd.Series) -> pd.Series:
        return (ser - self.des[col]["min"]) / (self.des[col]["max"] - self.des[col]["min"])

    def _denorm(self, col: str, ser: pd.Series) -> pd.Series:
        return ser * (self.des[col]["max"] - self.des[col]["min"]) + self.des[col]["min"]

    def _log_norm(self, col: str, ser: pd.Series) -> pd.Series:
        log_ser = np.log(ser)
        return (log_ser - self.des[col]["log_min"]) / (self.des[col]["log_max"] - self.des[col]["log_min"])

    def _log_denorm(self, col: str, ser: pd.Series) -> pd.Series:
        return np.exp(ser * (self.des[col]["log_max"] - self.des[col]["log_min"]) + self.des[col]["log_min"])

    def run(self, df: pd.DataFrame, map_dict: Dict[str, NormalizationMapper]) -> pd.DataFrame:
        new_df = df.copy()
        for col, mapper in map_dict.items():
            new_df[col] = getattr(self, f"_{mapper.method}")(mapper.describe_col, new_df[col])
        return new_df


def merge_price_and_weights(df_price: pd.DataFrame, df_weights: pd.DataFrame, ticker: str) -> pd.DataFrame:
    df_weights_ticker = df_weights[["date", ticker]].rename(columns={ticker: "weights"})
    df = df_price.merge(df_weights_ticker, how="left", on="date")
    df["weights"] = df["weights"].ffill().bfill()
    date_col = df.pop("date")
    df["date"] = date_col
    return df


window_size = 13  # 13 weeks (three month)

In [None]:
price_data_dict = load_all_price_data("/app/dataset/raw/20250928_现货价格数据和历史排名数据/price_data/*.xlsx")

In [None]:
for i in range(len(df_weights) - window_size + 1):
    df_weights_window = df_weights[i : i + window_size]
    print(df_weights_window)
    raise

In [None]:
# ticker_mapping = TickerMapping()
# for xlsx_path in sorted(glob.glob("/app/dataset/raw/20250928_现货价格数据和历史排名数据/price_data/*.xlsx")):
#     ticker = re.findall("(.*)USDT.xlsx", os.path.basename(xlsx_path))[0]
#     ticker = ticker_mapping.price_to_hist(ticker)

#     # normalization
#     df = pd.read_excel(xlsx_path)
#     df_describe = get_and_save_extend_describe(df, f"{output_dir}/describe/{ticker}.csv")
#     normalizer = Normalizer(df_describe)
#     df_norm = normalizer.run(df, map_dict_norm)
    
#     # merge weights
#     df = merge_price_and_weights(df_norm, df_weights, ticker)
#     print(df)
#     raise

In [None]:
# flip row sign if sum < 0
        row_sums = model_output.sum(dim=1, keepdim=True)
        mask = (row_sums < 0).float()
        multiplier = 1 - 2 * mask
        model_output_new = model_output * multiplier