https://www.kaggle.com/code/ikeppyo/jpx-lightgbm-demo をfork。

# コンテストの概要
* 日本取引所グループ（JPX）による、2000銘柄の証券を対象にした、**証券の値段（終値）の変化率**予測。
* **翌日から翌々日にかけての終値**の変化率が目的変数。
* 提出データは、目的変数の値そのものではなく、目的変数の値を降順に並べた際の順位。
* コンテストについて、より詳しく知りたい場合は[【日本語ver】Easy to understand the competition](https://www.kaggle.com/code/chumajin/ver-easy-to-understand-the-competition)が非常に役に立つと思います。

# ノートブックの概要
* このノートブックでは[データ読み込み]→[データ統合]→[特徴量エンジニアリング]→[学習]→[推論・評価]→[提出]を一気通貫で行います。
* 使用するモデルはLightGBMです。
* モデルを3つ生成し、結果をアンサンブルして最終的な推論結果を作成します。

推論・評価までの流れは以下の通りです。  
赤背景は推論・評価推時のみ使用している関数です。青背景は提出時にも使用している関数・データとなります。  
青背景の関数をカスタマイズすることで、様々な特徴量で精度検証ができ、また、そのままSubmitもできるようになっています。

In [None]:
# Image('../input/jpx-images/jpx_flow.drawio.png')

# 入力ファイルの概要
* `jpx-tokyo-stock-exchange-prediction/`
    * `stock_list.csv`:  主な項目は証券コードと証券に関する属性情報。Keyは**SecuritiesCode**。
    * `train_files/`:  学習用データ群。Dateの範囲は2017-01-04 ～ 2021-12-03。
        * `stock_prices.csv`:  主な項目は証券コードと日単位の証券価格、および、目的変数（Target）。Keyは**Date**と**SecuritiesCode**であり、これらを連結した**RowId**も用意されている。
        * `secondary_stock_prices.csv`:  項目は**stock_prices.csv**と同じだが、**stock_prices.csv**の対象とならなかった証券が入っている。Keyは**stock_prices.csv**と同じ。
        * `options.csv`:  主な項目はオプション（証券用語）コードと日単位のオプション価格。Keyは**Date**と**OptionsCode**であり、これらを連結した**DateCode**も用意されている。
        * `trades.csv`:  主な項目はマーケット毎の前営業週における取引サマリ。Keyは**Date**と**Section**。**Section**を加工することで**stock_list.csv**の**NewMarketSegment**と紐づけることができる。意味のあるレコードの発生は週次。
        * `financials.csv`:  主な項目は四半期決算報告の内容。Keyは**Date**と**SecuritiesCode**であり、これらを連結した**DateCode**も用意されている。レコードの発生は四半期毎。
    * `supplemental_files/`:  追加の学習用データ群。5月上旬、6月上旬、およびコンテスト終了直前に最新のデータが反映される。2022-04-05時点でのDateの範囲は2021-12-06 ～ 2022-02-28。
        * **train_files**配下と同じ形式のファイル群が格納されている
        * `example_test_files/`:  評価用データ群（のサンプル）
        * **train_files**配下とほぼ（※）同じ形式のファイル群が格納されている
        * ※ **stock_prices.csv**と**secondary_stock_prices.csv**から**Target**が削除されている点だけが**train_files**配下と異なる
    * `data_specifications/`:  上記ファイル群の項目説明書
        * `stock_list_spec.csv`:  **stock_list.csv**の項目説明書
        * `stock_prices_spec.csv`:  **stock_prices.csv**、**secondary_stock_prices.csv**の項目説明書
        * `options_spec.csv`:  **options.csv**の項目説明書
        * `trades_spec.csv`:  **trades.csv**の項目説明書
        * `stock_fin_spec.csv`:  **financials.csv**の項目説明書
    * `jpx_tokyo_market_prediction/`:時系列API
    
ファイル間の関係性は以下のようなイメージです

In [None]:
# Image('../input/jpx-images/jpx_files.drawio.png')

データの詳細は https://www.kaggle.com/competitions/jpx-tokyo-stock-exchange-prediction/data を参照  
時系列APIの使い方は https://www.kaggle.com/competitions/jpx-tokyo-stock-exchange-prediction/overview/evaluation を参照  
オプションコード体系の日本語版は https://www.jpx.co.jp/sicc/securities-code/nlsgeu0000032d48-att/(HP)sakimono20220208.pdf を参照  
証券コード関係は https://www.jpx.co.jp/sicc/securities-code/01.html を参照

# 準備
* 使用するライブラリをインポートします

In [None]:
import os
import gc
from pathlib import Path
from decimal import ROUND_HALF_UP, Decimal

import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error

import matplotlib.pyplot as plt
import seaborn as sns

import warnings
warnings.simplefilter('ignore')

pd.set_option('display.max_columns', 100)
pd.set_option('display.max_rows', 100)

In [None]:
def reduce_mem_usage(df, verbose=True):
    numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64']
    start_mem = df.memory_usage().sum() / 1024**2    
    for col in df.columns: #columns毎に処理
        col_type = df[col].dtypes
        if col_type in numerics: #numericsのデータ型の範囲内のときに処理を実行. データの最大最小値を元にデータ型を効率的なものに変更
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == 'int':
                if (c_min > np.iinfo(np.int8).min) & (c_max < np.iinfo(np.int8).max):
                    df[col] = df[col].astype(np.int8)
                elif (c_min > np.iinfo(np.int16).min) & (c_max < np.iinfo(np.int16).max):
                    df[col] = df[col].astype(np.int16)
                elif (c_min > np.iinfo(np.int32).min) & (c_max < np.iinfo(np.int32).max):
                    df[col] = df[col].astype(np.int32)
                elif (c_min > np.iinfo(np.int64).min) & (c_max < np.iinfo(np.int64).max):
                    df[col] = df[col].astype(np.int64)  
            else:
                if (c_min > np.finfo(np.float16).min) & (c_max < np.finfo(np.float16).max):
                    df[col] = df[col].astype(np.float16)
                elif (c_min > np.finfo(np.float32).min) & (c_max < np.finfo(np.float32).max):
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)    
    end_mem = df.memory_usage().sum() / 1024**2
    if verbose: print('Mem. usage decreased to {:5.2f} Mb ({:.1f}% reduction)'.format(end_mem, 100 * (start_mem - end_mem) / start_mem))
    return df

# データ読み込み

以下のファイル群を読み込みます。
* stock_list.csv
* train_files 配下のファイル（Dateの範囲は2017-01-04 ～ 2021-12-03）
* supplemental_files 配下のファイル（Dateの範囲は2021-12-06 ～ 2022-02-28 ※2022/04/05現在）

## Function

In [None]:
def read_files(dir_name: str = 'train_files'):
    base_path = Path(f'../input/jpx-tokyo-stock-exchange-prediction/{dir_name}')
    prices = pd.read_csv(base_path / 'stock_prices.csv')
    options = pd.read_csv(base_path / 'options.csv')
    financials = pd.read_csv(base_path / 'financials.csv')
    trades = pd.read_csv(base_path / 'trades.csv')
    secondary_prices = pd.read_csv(base_path / 'secondary_stock_prices.csv')
    
    prices = reduce_mem_usage(prices)
    options = reduce_mem_usage(options)
    financials = reduce_mem_usage(financials)
    trades = reduce_mem_usage(trades)
    secondary_prices = reduce_mem_usage(secondary_prices)
    
    return prices, options, financials, trades, secondary_prices

## Exec

In [None]:
%%time

stock_list = pd.read_csv('../input/jpx-tokyo-stock-exchange-prediction/stock_list.csv')
train_files = read_files('train_files')
supplemental_files = read_files('supplemental_files')

# データ統合

* 読み込んだデータを結合して、一つのファイルに纏めます。

## Function

`merge_data`関数では各ファイルを水平方向に結合します。現時点では`stock_prices`、`stock_list`しか使っていません。  
コメントアウトを解除すれば`trades`、`financials`とも結合は可能ですが、  
これらのデータは、有効なレコードが発生するタイミングが日次ではないため、学習データとして意味のあるものとするためには「直近に発生した有効なレコードの値を引き継ぐ」などの対処が必要となります。  
`options`に関しては、[OptionsCodeの附番規則](https://www.jpx.co.jp/sicc/securities-code/nlsgeu0000032d48-att/(HP)sakimono20220208.pdf)を見ていけば適切な使い方が見えそうです。

In [None]:
def merge_data(prices, options, financials, trades, secondary_prices, stock_list):
    # stock_prices がベース
    base_df = prices.copy()
    
    # stock_listと結合
    _stock_list = stock_list.copy()
    _stock_list.rename(columns={'Close': 'Close_x'}, inplace=True)
    base_df = base_df.merge(_stock_list, on='SecuritiesCode', how="left")

    # tradesと結合
    # stock_listのNewMarketSegmentと紐づくよう、tradesのSection項目を編集する
    # _trades = trades.copy()
    # _trades['NewMarketSegment'] = _trades['Section'].str.split(' \(', expand=True)[0]
    # base_df = base_df.merge(_trades, on=['Date', 'NewMarketSegment'], how="left")

    # financialsと結合
    _financials = financials.copy()
    _financials.rename(columns={'Date': 'Date_x', 'SecuritiesCode': 'SecuritiesCode_x'}, inplace=True)
    use_cols = ['DateCode',
               'NetSales',
               'OperatingProfit',
               'OrdinaryProfit',
               'Profit', 
               'EarningsPerShare', 
               'TotalAssets', 
               'Equity', 
               'EquityToAssetRatio', 
               'NumberOfIssuedAndOutstandingSharesAtTheEndOfFiscalYearIncludingTreasuryStock',
               'ForecastEarningsPerShare']
    # メモリがカツカツなのでカラムを使うものだけに絞る
    _financials = _financials[use_cols]
    _financials.replace(['-', '－'], np.nan, inplace=True)
    for col in use_cols[1:]:
        _financials[col] = _financials[col].astype(float)
    prev_column = set(_financials.columns)
    
    # 同じDateCodeで複数行あるのでまとめあげる
    _financials = _financials.groupby('DateCode').max().reset_index()
    assert prev_column == set(_financials.columns.to_list())
    _financials.drop_duplicates(subset='DateCode', inplace=True)
    assert prev_column == set(_financials.columns.to_list())
    base_df = base_df.merge(_financials, left_on='RowId', right_on='DateCode', how="left")
    
    return base_df

`adjust_price`関数は[Train Demo](https://www.kaggle.com/code/smeitoma/train-demo)で紹介されている関数をほぼそのまま使わせて頂いています。（Dateのインデックス化だけコメントアウトしています）  
項目追加が発生するため、統合の範囲を超えていますが、関数内でソートやインデックスの生成といった操作が行われるため、この段階で実行しています。  
  
関数では**AdjustedClose**という項目が生成されます。  
株式は、分割や併合によって株価が大きく変動することがありますが、**Close**の代わりに**AdjustedClose**を使うことで、この影響を減少させることができるとのことです。

In [None]:
def adjust_price(price):
    """
    Args:
        price (pd.DataFrame)  : pd.DataFrame include stock_price
    Returns:
        price DataFrame (pd.DataFrame): stock_price with generated AdjustedClose
    """
    # transform Date column into datetime
    price.loc[: ,"Date"] = pd.to_datetime(price.loc[: ,"Date"], format="%Y-%m-%d")

    def generate_adjusted_close(df):
        """
        Args:
            df (pd.DataFrame)  : stock_price for a single SecuritiesCode
        Returns:
            df (pd.DataFrame): stock_price with AdjustedClose for a single SecuritiesCode
        """
        # sort data to generate CumulativeAdjustmentFactor
        df = df.sort_values("Date", ascending=False)
        # generate CumulativeAdjustmentFactor
        df.loc[:, "CumulativeAdjustmentFactor"] = df["AdjustmentFactor"].cumprod()
        # generate AdjustedClose
        df.loc[:, "AdjustedClose"] = (
            df["CumulativeAdjustmentFactor"] * df["Close"]
        ).map(lambda x: float(
            Decimal(str(x)).quantize(Decimal('0.1'), rounding=ROUND_HALF_UP)
        ))
        # reverse order
        df = df.sort_values("Date")
        # to fill AdjustedClose, replace 0 into np.nan
        df.loc[df["AdjustedClose"] == 0, "AdjustedClose"] = np.nan
        # forward fill AdjustedClose
        df.loc[:, "AdjustedClose"] = df.loc[:, "AdjustedClose"].ffill()
        return df

    # generate AdjustedClose
    price = price.sort_values(["SecuritiesCode", "Date"])
    price = price.groupby("SecuritiesCode").apply(generate_adjusted_close).reset_index(drop=True)

    # price.set_index("Date", inplace=True)
    return price

In [None]:
def collector(prices, options, financials, trades, secondary_prices, stock_list):
    # 読み込んだデータを統合して一つのファイルに纏める
    base_df = merge_data(prices, options, financials, trades, secondary_prices, stock_list)
    
    # AdjustedClose項目の生成
    base_df = adjust_price(base_df)
    
    return base_df

## Exec

supplemental filesを使わない場合は4行目以降をコメントアウトします。

In [None]:
%%time

base_df = collector(*train_files, stock_list)
supplemental_df = collector(*supplemental_files, stock_list)
base_df = pd.concat([base_df, supplemental_df]).reset_index(drop=True)

base_df['Target'] = base_df['Target'] * 100

In [None]:
gc.collect()

# 特徴量エンジニアリング

特徴量を生成し、推論結果の精度向上に貢献するものだけを選びます。  

## Function

`calc_change_rate_base`関数と`calc_volatility_base`関数は、[Train Demo](https://www.kaggle.com/code/smeitoma/train-demo)で紹介されていた関数を参考にしています。

In [None]:
def add_date_features(df):
    df["Date"] = pd.to_datetime(df["Date"])
#     df["Year"] = df["Date"].dt.year.astype("int32")
    df['Month'] = df["Date"].dt.month.astype("int8")
    df['day_of_month'] = df["Date"].dt.day.astype("int8")
    df['day_of_year'] = df["Date"].dt.dayofyear.astype("int16")
    df['day_of_week'] = (df["Date"].dt.dayofweek + 1).astype("int8")
    return df

def calc_return(tmp):
    # 今回の処理では関数を入れ子にするときに引数がないと、実行するときに引数ないのに渡されてると怒られるので、使っていないけど引数を置いておく。
    def func(price):
        price.loc[:, 'return'] = price['AdjustedClose'].pct_change()
        return price
    return func
# 「column_name で指定された項目の、periodsで指定された期間（複数）での変化率を導出し、項目として追加する関数」を生成する関数
# 生成された関数は、特定証券コードだけを持つデータフレームが入力されることを前提としている。
def calc_change_rate_base(column_name, periods):
    def func(price):
        for period in periods:
            price.loc[:, f"{column_name}_change_rate_{period}"] = price[column_name].pct_change(period)
        return price
    return func


# 「column_name で指定された項目の、periodsで指定された期間（複数）での変動の度合いを導出し、項目として追加する関数」を生成する関数
# 生成された関数は、特定証券コードだけを持つデータフレームが入力されることを前提としている。
def calc_volatility_base(column_name, periods):
    def func(price):
        for period in periods:
            price.loc[:, f"{column_name}_volatility_{period}"] = np.log(price[column_name]).diff().rolling(window=period, min_periods=1).std()
        return price
    return func

# 「column_name で指定された項目の、periodsで指定された期間（複数）での移動平均値と現在値の比率を導出し、項目として追加する関数」を生成する関数
# 移動平均値そのものではなく、現在値に対する比率としているのは、今回のTargetが比率であるため。
# 生成された関数は、特定証券コードだけを持つデータフレームが入力されることを前提としている。
def calc_moving_average_rate_base(column_name, periods):
    def func(price):
        for period in periods:
            price.loc[:, f"{column_name}_average_rate_{period}"] = price[column_name].rolling(window=period, min_periods=1).mean()
#             price.loc[:, f"{column_name}_average_deviation_rate_{period}"] = (price[column_name] - price[column_name].rolling(window=period, min_periods=1).mean()) / (price[column_name].rolling(window=period, min_periods=1).mean() + 0.00001)
#         short_period = periods[0]
#         mid_period = periods[1]
#         s = price[f"{column_name}_average_rate_{short_period}"] - price[f"{column_name}_average_rate_{mid_period}"]
#         s = s / s.abs()
#         price.loc[:, 'moving_average_cross'] = s - s.shift(1)

        return price
    return func

def calc_rolling_features_base(column_name, periods):
    def func(price):
        for period in periods:
            price.loc[:, f"{column_name}_rolling_min_{period}"] = price[column_name].rolling(window=period, min_periods=1).min()
            price.loc[:, f"{column_name}_rolling_max_{period}"] = price[column_name].rolling(window=period, min_periods=1).max()
            price.loc[:, f"{column_name}_rolling_std_{period}"] = price[column_name].rolling(window=period, min_periods=1).std().astype(np.float32)
            price.loc[:, f"{column_name}_rolling_var_{period}"] = price[column_name].rolling(window=period, min_periods=1).var().astype(np.float32)
            price.loc[:, f"{column_name}_rolling_SR_{period}"] = price.loc[:, f"{column_name}_average_rate_{period}"] / price.loc[:, f"{column_name}_rolling_std_{period}"]
        price.loc[:, f'{column_name}_std_change_rate'] = price.loc[:, f"{column_name}_rolling_std_{period}"].pct_change()
        return price
    return func


# # 終値の変動率を生成し、項目として追加する関数。これをShift-2するとTargetになる。
# # この関数は、特定証券コードだけを持つデータフレームが入力されることを前提としている。
# def calc_target_shift2(price):
#     price.loc[:, 'Target_shift2'] = price['Close'].pct_change()
#     return price

def calc_target_shifts(price):
    lags = [1, 2, 3, 4, 5, 6, 7]
    for i in lags:
        price.loc[:, f'Target_shift{i}'] = price['Target'].shift(i)
    return price

def calc_33Sector_rolling_features(base_df, periods):
    # 33SectorCodeの平均リターンを使った特徴量
    base_df2 = base_df.copy()
    sector_df = base_df2.groupby(['33SectorCode', 'Date'])['return'].mean().reset_index()
    for period in periods:
        sector_df[f'33Sector_rolling_mean_{period}'] = sector_df.groupby(['33SectorCode'])['return'].rolling(window=period).mean().reset_index()['return']
        
    base_df = base_df.merge(sector_df[['33SectorCode', 'Date', '33Sector_rolling_mean_5', '33Sector_rolling_mean_20', '33Sector_rolling_mean_80']],
                            on=['33SectorCode', 'Date'],
                           how='left')
    # returnが0のときに発散するのを防ぐために下駄を履かせる

    for period in periods:
        constant = abs(base_df[f'33Sector_rolling_mean_{period}'].min()+0.0001)
        base_df[f'deviation_from_sector_MA_{period}'] = (base_df['return'] - base_df[f'33Sector_rolling_mean_{period}']) / (base_df[f'33Sector_rolling_mean_{period}'] + constant)
    
    return base_df

#financialsデータを使った特徴量
def add_fin_features(price):
#     def func(price):
    # ここで行うのは微妙だが良さそうな場所もないので、ここでfinancialsのffillをやる。
    f_columns = ['NetSales',
               'OperatingProfit',
               'OrdinaryProfit',
               'Profit', 
               'EarningsPerShare', 
               'TotalAssets', 
               'Equity', 
               'EquityToAssetRatio', 
               'NumberOfIssuedAndOutstandingSharesAtTheEndOfFiscalYearIncludingTreasuryStock',
                'ForecastEarningsPerShare']
    for column in f_columns:
        price[column].fillna(method='ffill', inplace=True)
    #自己資本(CapitalAdequacy)の計算
    price.loc[:, 'TotalAssets']=pd.to_numeric(price.loc[:, 'TotalAssets'], errors="coerce")
    price.loc[:, 'EquityToAssetRatio']=pd.to_numeric(price.loc[:, 'EquityToAssetRatio'], errors="coerce")
    price.loc[:, "CapitalAdequacy"]=price.loc[:,'EquityToAssetRatio']*price.loc[:,'TotalAssets']
#     base_df["CapitalAdequacy"].fillna(method='ffill', inplace=True)
    #時価総額(AMV)の計算
    price.loc[:, "NumberOfIssuedAndOutstandingSharesAtTheEndOfFiscalYearIncludingTreasuryStock"]=pd.to_numeric(price.loc[:, 'NumberOfIssuedAndOutstandingSharesAtTheEndOfFiscalYearIncludingTreasuryStock'], errors="coerce")
    price.loc[:, "AMV"]=price.loc[:,'NumberOfIssuedAndOutstandingSharesAtTheEndOfFiscalYearIncludingTreasuryStock']*price.loc[:,"Close"]
#     base_df["AMV"].fillna(method='ffill', inplace=True)
    #時価簿価比率:簿価(総資産)と時価の差分を表す．この差分は、投資家の企業の将来に対する期待と解釈される
    price.loc[:, "PBR"]=price.loc[:,"AMV"]/price.loc[:,'CapitalAdequacy']
#     base_df["PBR"].fillna(method='ffill', inplace=True)
    #負債時価総額比率:資金調達方法(負債と株式発行)の割合．値が大きいほど負債が多く、倒産リスクが大きくなる
    price.loc[:, 'Equity']=pd.to_numeric(price.loc[:, 'Equity'], errors="coerce")
    price.loc[:, "debt"]=price.loc[:,"TotalAssets"]-price.loc[:,"Equity"]
    price.loc[:, "RatioOfNetWorthToTotalDebt"]=price.loc[:, "debt"]/price.loc[:, 'AMV']
#     base_df["RatioOfNetWorthToTotalDebt"].fillna(method='ffill', inplace=True)
    price.drop("debt", axis=1, inplace=True)
    #営業マージン:どの程度効率よく付加価値をつけているか
    price.loc[:, "OperatingProfit"]=pd.to_numeric(price.loc[:, "OperatingProfit"], errors="coerce")
    price.loc[:, "NetSales"]=pd.to_numeric(price.loc[:, "NetSales"], errors="coerce")
    price.loc[:, "OperatingMargin"]=price.loc[:,"OperatingProfit"]/price.loc[:,"NetSales"]
#     base_df["OperatingMargin"].fillna(method='ffill', inplace=True)
    #純利益マージン；営業マージンとほとんど同じ．違いは負債などに対する支払いの有無．つまり、資金調達の効率性も加味される
    price.loc[:, "Profit"]=pd.to_numeric(price.loc[:, "Profit"], errors="coerce")
    price.loc[:, "ProfitMargin"]=price.loc[:,"Profit"]/price.loc[:,"NetSales"]
#     base_df["ProfitMargin"].fillna(method='ffill', inplace=True)
    #自己資本利益率(ROE)
    price.loc[:, "ROE"]=price.loc[:,"Profit"]/price.loc[:,"Equity"]
#     base_df["ROE"].fillna(method='ffill', inplace=True)
    #総資産回転率:保有資産の効率性を表す
    price.loc[:, "TotalAssetsTurnover"]=price.loc[:,"NetSales"]/price.loc[:,"TotalAssets"]
#     base_df["TotalAssetsTurnover"].fillna(method='ffill', inplace=True)
    #自己資本倍率
    price.loc[:, "CapitalAdequecyRatio"]=price.loc[:,"Profit"]/price.loc[:,"TotalAssets"]
#     base_df["CapitalAdequecyRatio"].fillna(method='ffill', inplace=True)
    #無リスク利子率
    r = {2017:-0.09, 2018:-0.137, 2019:-0.18, 2020:-0.147, 2021:-0.123, 2022:-0.072}
    price['r'] = price["Date"].dt.year.astype("int32")
    price['r'] = price['r'].map(r)

    #二年間の企業価値
    price['EarningsPerShare']=pd.to_numeric(price['EarningsPerShare'], errors="coerce")
    price['ForecastEarningsPerShare']=pd.to_numeric(price['ForecastEarningsPerShare'], errors="coerce")
    price["NPV"]=price.loc[:,'EarningsPerShare']+price.loc[:,'ForecastEarningsPerShare']/(1+price.loc[:,"r"])
    
#     #市場リスクプレミアム
    market_df = price.groupby('Date')['return'].mean().reset_index()
    market_df = market_df.rename(columns={'return':'MarketAverageReturn'})
    price = price.merge(market_df, on=['Date'], how='left')
    price.loc[:, 'RiskPremium'] = price['MarketAverageReturn'] - price['r']
    price.drop('r', axis = 1, inplace=True)
    return price
#     return func

# 入力データフレームを証券コード毎にグルーピングし、引数で渡された関数を適用する関数
# functionsには↑で定義したcalc_xxxの関数のリストが渡される想定。
def add_columns_per_code(price, functions):
    def func(df):
        for f in functions:
            df = f(df)
        return df
    price = price.sort_values(["SecuritiesCode", "Date"])
    price = price.groupby("SecuritiesCode").apply(func)
    price = price.reset_index(drop=True)
    return price

# 入力データフレームに特徴量を追加する関数
# 追加する項目は、基本的にレコード内の値だけを使う想定
def add_columns_per_day(base_df):
    base_df['diff_rate1'] = (base_df['Close'] - base_df['Open']) / base_df['Close']
    base_df['diff_rate2'] = (base_df['High'] - base_df['Low']) / base_df['Close']    
    return base_df

# 入力データフレームに特徴量を追加する関数
def generate_features(base_df):
    prev_column_names = base_df.columns
    
    periods = [5, 20, 80]
    functions = [
        calc_return(periods),
#         calc_change_rate_base("AdjustedClose", periods), 
#         calc_volatility_base("AdjustedClose", periods), 
#         calc_moving_average_rate_base("Volume", periods), 
        calc_moving_average_rate_base("return", periods), 
        calc_rolling_features_base("return", periods)
#         calc_target_shifts
    ]
    # 証券コード単位の特徴量（移動平均等、一定期間のレコードをインプットに生成する特徴量）を追加
    base_df = add_columns_per_code(base_df, functions)
    
    # 日付の特徴量
    base_df = add_date_features(base_df)
    
    # 33SectorCodeごとのリターンの移動平均
    base_df = calc_33Sector_rolling_features(base_df, periods)
    
    # 財務諸表からの特徴量
    base_df = base_df.sort_values(["SecuritiesCode", "Date"])
    base_df = base_df.groupby("SecuritiesCode").apply(add_fin_features)
    base_df = base_df.reset_index(drop=True)
    

    # 日単位の特徴量（レコード内の値で導出できる特徴量）を追加
    base_df = add_columns_per_day(base_df)
    
    # 後で特徴量を選択しやすくするため、追加した項目名のリストを生成
    add_column_names = list(set(base_df.columns) - set(prev_column_names))
    return base_df, add_column_names

In [None]:
# 特徴量選択
def select_features(feature_df, add_column_names, is_train):
    # 基本項目
    base_cols = ['RowId', 'Date', 'SecuritiesCode']
    # 数値系の特徴量
    numerical_cols = sorted(add_column_names) + ['AdjustedClose']
    # カテゴリ系の特徴量
    categorical_cols = ['33SectorCode']
    # 目的変数
    label_col = ['Target']
    
    # 特徴量
    feat_cols = numerical_cols + categorical_cols

    # データフレームの項目を選択された項目だけに絞込
    feature_df = feature_df[base_cols + feat_cols + label_col]
    # カテゴリ系項目はdtypeをcategoryに変更
    feature_df[categorical_cols] = feature_df[categorical_cols].astype('category')
    
    feature_df = feature_df.drop_duplicates(subset='RowId')
    
    if is_train:
        # 学習データの場合は、NA項目があるレコードを削除
#         feature_df[numerical_cols] = feature_df[numerical_cols].fillna(method='ffill')
        feature_df.dropna(inplace=True)
    else:
        # 推論データの場合は、NA項目を補完
        feature_df[numerical_cols] = feature_df[numerical_cols].fillna(0)
        feature_df[numerical_cols] = feature_df[numerical_cols].replace([np.inf, -np.inf], 0)
    
    return feature_df, feat_cols, label_col

In [None]:
def preprocessor(base_df, is_train=True):
    feature_df = base_df.copy()
    
    ## 特徴量生成
    feature_df, add_column_names = generate_features(feature_df)
    
    ## 特徴量選択
    feature_df, feat_cols, label_col = select_features(feature_df, add_column_names, is_train)
    
    feature_df = reduce_mem_usage(feature_df)

    return feature_df, feat_cols, label_col

## Exec

In [None]:
%%time

feature_df, feat_cols, label_col = preprocessor(base_df)

In [None]:
gc.collect()

In [None]:
feature_df

In [None]:
feat_cols

In [None]:
def add_rank_with_new_col_name(df, col_name, new_col_name, ascend=False):
    df[new_col_name] = df.groupby("Date")[col_name].rank(ascending=ascend, method="first") - 1 
    df[new_col_name] = df[new_col_name].astype("int")
    return df

def calc_spread_return_sharpe(df: pd.DataFrame, pred_name, portfolio_size: int = 200, toprank_weight_ratio: float = 2) -> float:
    """
    Args:
        df (pd.DataFrame): predicted results
        portfolio_size (int): # of equities to buy/sell
        toprank_weight_ratio (float): the relative weight of the most highly ranked stock compared to the least.
    Returns:
        (float): sharpe ratio
    """
    def _calc_spread_return_per_day(df, portfolio_size, toprank_weight_ratio):
        """
        Args:
            df (pd.DataFrame): predicted results
            portfolio_size (int): # of equities to buy/sell
            toprank_weight_ratio (float): the relative weight of the most highly ranked stock compared to the least.
        Returns:
            (float): spread return
        """
        assert df[pred_name].min() == 0
        assert df[pred_name].max() == len(df[pred_name]) - 1
        weights = np.linspace(start=toprank_weight_ratio, stop=1, num=portfolio_size)
        purchase = (df.sort_values(by=pred_name)['Target'][:portfolio_size] * weights).sum() / weights.mean()
        short = (df.sort_values(by=pred_name, ascending=False)['Target'][:portfolio_size] * weights).sum() / weights.mean()
        return purchase - short

    buf = df.groupby('Date').apply(_calc_spread_return_per_day, portfolio_size, toprank_weight_ratio)
    sharpe_ratio = buf.mean() / buf.std()
    return sharpe_ratio

# 学習

## Function

学習を行いモデルを生成します

In [None]:
# 予測値を降順に並べて順位番号を振る関数
# 言い換えると、目的変数から提出用項目を導出する関数
def add_rank(df, col_name="pred", ascend=False):
    df["Rank"] = df.groupby("Date")[col_name].rank(ascending=ascend, method="first") - 1 
    df["Rank"] = df["Rank"].astype("int")
    return df

def add_bottom_rank(df, col_name="pred"):
    df['BottomRank'] = df.groupby("Date")[col_name].rank(ascending=False, method="first") - 1
    df['BottomRank'] = df['BottomRank'].astype('int')
    return df

`calc_spread_return_sharpe`関数は[Train Demo](https://www.kaggle.com/code/smeitoma/train-demo)で紹介されている関数をそのまま使わせて頂いています。  
推論した**Rank**と、正解の**Target**を含むデータフレームを渡すと、コンテストの評価方法に沿ったスコアを計算してくれます。

In [None]:
def calc_spread_return_sharpe(df: pd.DataFrame, portfolio_size: int = 200, toprank_weight_ratio: float = 2) -> float:
    """
    Args:
        df (pd.DataFrame): predicted results
        portfolio_size (int): # of equities to buy/sell
        toprank_weight_ratio (float): the relative weight of the most highly ranked stock compared to the least.
    Returns:
        (float): sharpe ratio
    """
    def _calc_spread_return_per_day(df, portfolio_size, toprank_weight_ratio):
        """
        Args:
            df (pd.DataFrame): predicted results
            portfolio_size (int): # of equities to buy/sell
            toprank_weight_ratio (float): the relative weight of the most highly ranked stock compared to the least.
        Returns:
            (float): spread return
        """
        assert df['Rank'].min() == 0
        assert df['Rank'].max() == len(df['Rank']) - 1
        weights = np.linspace(start=toprank_weight_ratio, stop=1, num=portfolio_size)
        purchase = (df.sort_values(by='Rank')['Target'][:portfolio_size] * weights).sum() / weights.mean()
        short = (df.sort_values(by='Rank', ascending=False)['Target'][:portfolio_size] * weights).sum() / weights.mean()
        return purchase - short

    buf = df.groupby('Date').apply(_calc_spread_return_per_day, portfolio_size, toprank_weight_ratio)
    sharpe_ratio = buf.mean() / buf.std()
    return sharpe_ratio

In [None]:
# 予測用のデータフレームと、予測結果をもとに、スコアを計算する関数
def evaluator(df, pred):
    df["pred"] = pred
    df = add_rank(df)
    score = calc_spread_return_sharpe(df)
    return score

`lightgbm`ではなく`optuna.integration.lightgbm`をimportすることで、パイパーパラメータチューニングが実行されるようになります。

In [None]:
def add_rank_with_new_col_name(df, col_name, new_col_name, ascend=False):
    df[new_col_name] = df.groupby("Date")[col_name].rank(ascending=ascend, method="first") - 1 
    df[new_col_name] = df[new_col_name].astype("int")
    return df

In [None]:
def add_preliminary_rank(df):
    
    def func(row):
        THRESHOLD = 400
        MAX_COUNT = 3000  # 多めにしておく
        if row['TopPredRank'] < THRESHOLD and row['BottomPredRank'] >= THRESHOLD:
            return row['TopPredRank']
        elif row['TopPredRank'] < THRESHOLD and row['BottomPredRank'] >= THRESHOLD:
            return MAX_COUNT - row['BottomPredRank']
        elif row['TopPredRank'] >= THRESHOLD and row['BottomPredRank'] >= THRESHOLD:
            return row['TopPredRank']
        else:
            if row['TopPredRank'] <= row['BottomPredRank']:
                return row['TopPredRank']
            else:
                return MAX_COUNT - row['BottomPredRank']
            
    df.loc[:, 'PreliminaryRank'] = df.apply(lambda x: func(x), axis=1)
    return df

In [None]:
import lightgbm as lgb
import pickle
# 学習を実行する関数
def rank_trainer_dual(feature_df, feat_cols, label_col, fold_params, seed=2022):
    scores = []
    top_models = []
    bottom_models = []
    params = []

    for fold_id, param in enumerate(fold_params):
        ################################
        # データ準備
        ################################
        train = feature_df[(param[0] <= feature_df['Date']) & (feature_df['Date'] < param[1])]
        valid = feature_df[(param[1] <= feature_df['Date']) & (feature_df['Date'] < param[2])]
        train = add_rank(train, col_name='Target', ascend=True)
        train = add_bottom_rank(train, col_name='Target')
        valid = add_rank(valid, col_name='Target', ascend=True)
        valid = add_bottom_rank(valid, col_name='Target')
        train = train.sort_values(['Date', 'SecuritiesCode']).reset_index(drop=True)
        train_group = train.groupby('Date')['SecuritiesCode'].count().to_list()
        valid = valid.sort_values(['Date', 'SecuritiesCode']).reset_index(drop=True)
        valid_group = valid.groupby('Date')['SecuritiesCode'].count().to_list()
        
        

        X_train = train[feat_cols]
        y_train = train['Rank']
        X_valid = valid[feat_cols]
        y_valid = valid['Rank']
        
        lgb_train = lgb.Dataset(X_train, y_train, group=train_group)
        lgb_valid = lgb.Dataset(X_valid, y_valid, reference=lgb_train, group=valid_group)
        
        ################################
        # 学習
        ################################
        label_gain = np.arange(max(max(train_group), max(valid_group))+100)
        params = {
#             'device': 'gpu',
            'task': 'train',                   # 学習
            'boosting_type': 'gbdt',           # GBDT
            'objective': 'lambdarank',         # 回帰
            'metric': 'ndcg',                  # 損失（誤差）
            'label_gain':label_gain,
#             'lambdarank_truncation_level':max(max(train_group), max(valid_group)),
            'lambdarank_truncation_level':400,
            'ndcg_eval_at':[200, 10, 50, 100, 150],
            'learning_rate': 0.01,             # 学習率
            'lambda_l1': 0.7,                  # L1正則化項の係数
            'lambda_l2': 0.7,                  # L2正則化項の係数
            'num_leaves': 31,                  # 最大葉枚数
            'max_depth':7,
#             'feature_fraction': 0.5,           # ランダムに抽出される列の割合
            'bagging_fraction': 0.65,
            'bagging_freq': 1, 
            'colsample_bytree': 0.7,
            'colsample_bynode': 0.7,
            'min_child_samples': 5,           # 葉に含まれる最小データ数
            'force_col_wise': True,
            'seed': seed                       # シード値
        } 
 
        lgb_results = {}                       
        model = lgb.train( 
            params,                            # ハイパーパラメータ
            lgb_train,                         # 訓練データ
            valid_sets=[lgb_train, lgb_valid], # 検証データ
            valid_names=['Train', 'Valid'],    # データセット名前
            num_boost_round=1000,              # 計算回数
            early_stopping_rounds=-1,         # 計算打ち切り設定
            evals_result=lgb_results,          # 学習の履歴
            verbose_eval=100,                  # 学習過程の表示サイクル
        )  

        ################################
        # 結果描画
        ################################
        fig = plt.figure(figsize=(10, 8))

        # loss
#         plt.subplot(1,2,1)
#         loss_train = lgb_results['Train']['rho']
#         loss_test = lgb_results['Valid']['rho']   
#         plt.xlabel('Iteration')
#         plt.ylabel('logloss')
#         plt.plot(loss_train, label='train loss')
#         plt.plot(loss_test, label='valid loss')
#         plt.legend()

        # feature importance
        plt.subplot(1,2,2)
        importance = pd.DataFrame({'feature':feat_cols, 'importance':model.feature_importance()})
        sns.barplot(x = 'importance', y = 'feature', data = importance.sort_values('importance', ascending=False))

        plt.tight_layout()
        plt.show()

        
        # 推論
        pred = model.predict(X_valid, num_iteration=model.best_iteration)
        valid['TopPred'] = pred
        
        top_models.append(model)
        model_name = f'lgb_top_rank_fold_{fold_id}.bin'
        pickle.dump(model, open(model_name, 'wb'))
        
        ################################
        # 下位200位の予測モデル
        ################################
        
        X_train = train[feat_cols]
        y_train = train['BottomRank']
        X_valid = valid[feat_cols]
        y_valid = valid['BottomRank']
        
        lgb_train = lgb.Dataset(X_train, y_train, group=train_group)
        lgb_valid = lgb.Dataset(X_valid, y_valid, reference=lgb_train, group=valid_group)
        lgb_results = {} 
        model = lgb.train( 
            params,                            # ハイパーパラメータ
            lgb_train,                         # 訓練データ
            valid_sets=[lgb_train, lgb_valid], # 検証データ
            valid_names=['Train', 'Valid'],    # データセット名前
            num_boost_round=1000,              # 計算回数
            early_stopping_rounds=-1,         # 計算打ち切り設定
            evals_result=lgb_results,          # 学習の履歴
            verbose_eval=100,                  # 学習過程の表示サイクル
        )  

        ################################
        # 結果描画
        ################################
        fig = plt.figure(figsize=(10, 8))

        # loss
#         plt.subplot(1,2,1)
#         loss_train = lgb_results['Train']['rho']
#         loss_test = lgb_results['Valid']['rho']   
#         plt.xlabel('Iteration')
#         plt.ylabel('logloss')
#         plt.plot(loss_train, label='train loss')
#         plt.plot(loss_test, label='valid loss')
#         plt.legend()

        # feature importance
        plt.subplot(1,2,2)
        importance = pd.DataFrame({'feature':feat_cols, 'importance':model.feature_importance()})
        sns.barplot(x = 'importance', y = 'feature', data = importance.sort_values('importance', ascending=False))

        plt.tight_layout()
        plt.show()
        
        
#         # 推論
        pred =  model.predict(X_valid, num_iteration=model.best_iteration)
        valid['BottomPred'] = pred
        
        bottom_models.append(model)
        model_name = f'lgb_bottom_rank_fold_{fold_id}.bin'
        pickle.dump(model, open(model_name, 'wb'))
        
        
        
        ################################
        # 評価
        ################################
        
        valid = add_rank_with_new_col_name(valid, col_name='TopPred', new_col_name='TopPredRank', ascend=False)
        valid = add_rank_with_new_col_name(valid, col_name='BottomPred', new_col_name='BottomPredRank', ascend=False)
        
        valid = add_preliminary_rank(valid)
        valid = add_rank_with_new_col_name(valid, col_name='PreliminaryRank', new_col_name='SubmitRank', ascend=False)
        
        
        # 評価
        score = evaluator(valid, valid['SubmitRank'].to_numpy())

        scores.append(score)

    print("CV_SCORES:", scores)
    print("CV_SCORE:", np.mean(scores))
    
    return top_models, bottom_models

## Exec

In [None]:
# 2020-12-23よりも前のデータは証券コードが2000個すべて揃っていないため、これ以降のデータのみを使う。
# (学習用データの開始日、学習用データの終了日＝検証用データの開始日、検証用データの終了日)
fold_params = [
    ('2018-04-01', '2020-05-01', '2020-06-01'),
    ('2019-04-01', '2021-05-01', '2021-06-01'),
    ('2020-04-01', '2022-05-01', '2022-06-01'),
]
# models = trainer(feature_df, feat_cols, label_col, fold_params)
# models = rank_trainer(feature_df, feat_cols, label_col, fold_params)
top_models, bottom_models = rank_trainer_dual(feature_df, feat_cols, label_col, fold_params)

# 推論・評価

生成したモデルを使って試験用データの推論を行いスコアを算出します。

## Function

In [None]:
def predictor(feature_df, feat_cols, models, is_train=True):
    X = feature_df[feat_cols]
    
    # 推論
    preds = list(map(lambda model: model.predict(X, num_iteration=model.best_iteration), models))
    
    # スコアは学習時のみ計算
    if is_train:
        scores = list(map(lambda pred: evaluator(feature_df, pred), preds))
        print("SCORES:", scores)

    # 推論結果をバギング
    pred = np.array(preds).mean(axis=0)
    

    # スコアは学習時のみ計算
    if is_train:
        score = evaluator(feature_df, pred)
        print("SCORE:", score)
    
    return pred

In [None]:
def predictor_dual(feature_df, feat_cols, top_models, bottom_models, is_train=True):
    X = feature_df[['Date', 'SecuritiesCode'] + feat_cols]
    
    # 推論
    top_preds = list(map(lambda model: model.predict(X[feat_cols], num_iteration=model.best_iteration), top_models))
    bottom_preds = list(map(lambda model: model.predict(X[feat_cols], num_iteration=model.best_iteration), bottom_models))
    preds = []
    for i in range(len(top_preds)):
        X['TopPred'] = top_preds[i]
        X['BottomPred'] = bottom_preds[i]
        X = add_rank_with_new_col_name(X, col_name='TopPred', new_col_name='TopPredRank', ascend=False)
        X = add_rank_with_new_col_name(X, col_name='BottomPred', new_col_name='BottomPredRank', ascend=False)
        X = add_preliminary_rank(X)
        X = add_rank_with_new_col_name(X, col_name='PreliminaryRank', new_col_name='SubmitRank', ascend=False)
        preds.append(X['SubmitRank'].to_numpy())
    # スコアは学習時のみ計算
    if is_train:
        scores = list(map(lambda pred: evaluator(feature_df, pred), preds))
        print("SCORES:", scores)

    # 推論結果をバギング
#     pred = np.array(preds).mean(axis=0)
    w = np.array([0.2, 0.2, 0.6])
    pred = np.average(np.array(preds), axis=0, weights=w)

    # スコアは学習時のみ計算
    if is_train:
        score = evaluator(feature_df, pred)
        print("SCORE:", score)
    
    return pred

## Exec

In [None]:
# 試験用データは学習用にも検証用にも使用していないものを使う
test_df = feature_df[('2022-02-01' <= feature_df['Date'])].copy()
# pred = predictor(test_df, feat_cols, models)
pred = predictor_dual(test_df, feat_cols, top_models, bottom_models)

# 提出

時系列APIを使って推論結果を登録します。  
特徴量として、移動平均等の過去データを参照する値を採用しているため、時系列APIから得られたデータをため込む仕組みを実装する必要があります。  
この仕組みに関しては、[Start-to-finish demo based on s-meitoma + tweaks](https://www.kaggle.com/code/lowellniles/start-to-finish-demo-based-on-s-meitoma-tweaks)を参考にさせていただきました。  
以下のコードでは`past_df`というデータフレームに履歴データをため込む実装になっています。

In [None]:
# 時系列APIのロード
import jpx_tokyo_market_prediction
env = jpx_tokyo_market_prediction.make_env()
iter_test = env.iter_test()

In [None]:
# supplemental filesを履歴データの初期状態としてセットアップ
past_df = base_df[base_df['Date'] >= '2020-12-23'].copy()
past_df = past_df.reset_index(drop=True)

In [None]:
# 日次で推論・登録
for i, (prices, options, financials, trades, secondary_prices, sample_prediction) in enumerate(iter_test):
    print(f'iteration {i}')
    current_date = prices["Date"].iloc[0]

    if i == 0:
        # リークを防止するため、時系列APIから受け取ったデータより未来のデータを削除
        past_df = past_df[past_df["Date"] < current_date]

#     # リソース確保のため古い履歴を削除
#     threshold = (pd.Timestamp(current_date) - pd.offsets.BDay(80)).strftime("%Y-%m-%d")
#     past_df = past_df[past_df["Date"] >= threshold]
    
    # 時系列APIから受け取ったデータを履歴データに統合
    base_df = collector(prices, options, financials, trades, secondary_prices, stock_list)
    past_df = pd.concat([past_df, base_df]).reset_index(drop=True)

    # 特徴量エンジニアリング
    feature_df, feat_cols, label_col = preprocessor(past_df, False)

    # 予測対象レコードだけを抽出
    feature_df = feature_df[feature_df['Date'] == current_date]

    # 推論
#     feature_df["pred"] = predictor(feature_df, feat_cols, models, False)
    feature_df['pred'] = predictor_dual(feature_df, feat_cols, top_models, bottom_models, False)

    # 推論結果からRANKを導出し、提出データに反映
    feature_df = add_rank(feature_df)
    feature_map = feature_df.set_index('SecuritiesCode')['Rank'].to_dict()
    sample_prediction['Rank'] = sample_prediction['SecuritiesCode'].map(feature_map)
    

    # 結果を登録
    env.predict(sample_prediction)