In [63]:
### インポート ###
import numpy as np
import pandas as pd
import glob
from selenium.webdriver.chrome.service import Service
from datetime import datetime
import time
import re
import csv
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import List, TypeVar, Generic

In [77]:
# ====================
# 設定ファイル読み込み
# ====================
df_config_style = pd.read_excel("C:\\keiba\\tool\\config.xlsx", sheet_name="style", header=0)
df_config_min_records = pd.read_excel("C:\\keiba\\tool\\config.xlsx", sheet_name="min_records", header=0)
df_config_features = pd.read_excel("C:\\keiba\\tool\\config.xlsx", sheet_name="features", header=0)
# 脚質
STYLE_MAP = df_config_style.set_index('key')['value'].to_dict()
print(f'脚質: {STYLE_MAP}')
REVERSE_STYLE_MAP = {v: k for k, v in STYLE_MAP.items()}
# 騎手複勝率
FEATURES_CONFIG = df_config_features.set_index('key')['value'].to_dict()
MIN_JOCKEY_RECORDS = FEATURES_CONFIG.get("min_jockey_records")
print(f'騎手複勝率計算用のレコード数: {MIN_JOCKEY_RECORDS}')

脚質: {1: '逃げ', 2: '先行', 3: '差し', 4: '追込'}
騎手複勝率計算用のレコード数: 100


In [79]:
# =========
# メソッド
# =========

def calculate_recent_time_index_avg(row, df):
    """
    特定の馬(horse_id)の直近2走の平均time_indexを計算する
    """
    target_id = row['horse_id']
    target_date = row['race_date']
    
    # 1. dfから同じhorse_id、かつ今回のrace_dateより前のデータを抽出
    # ※昇順にソート（直近が下に来るようにする）
    df_filtered = df[
        (df['horse_id'] == target_id) & 
        (df['race_date'] < target_date)
    ].sort_values('race_date')
    
    # 2. time_indexがNaNでないものを抽出
    df_valid = df_filtered.dropna(subset=['time_index'])
    
    # 3. 有効なデータが2つ以上あるか判定
    if len(df_valid) >= 2:
        # 下から2つ（直近2走）を選択して平均を出す
        avg_index = df_valid.tail(2)['time_index'].mean()
        return avg_index
    else:
        # 2回未満ならNaN
        return np.nan

def calculate_jockey_win_rate(row, df):
    """
    特定の騎手(jockey_id)の直近（有効データ）の勝率を計算する
    """
    target_id = row['jockey_id']
    target_date = row['race_date']
    
    # 1. その騎手の、今回のレース日より前のデータを抽出
    # 2. かつ finish_rank が NaN でないものに絞る
    # 3. 日付順（昇順）にソート
    df_jockey_past = df[
        (df['jockey_id'] == target_id) & 
        (df['race_date'] < target_date) &
        (df['finish_rank'].notna())
    ].sort_values('race_date')
    
    # 4. 直近 NUM_RECENT_JOCKEY_RECORDS 個のレコードを抽出（NUM_RECENT_JOCKEY_RECORDS 個なければ全件）
    df_recent = df_jockey_past.tail(MIN_JOCKEY_RECORDS)
    
    # レコード数をカウント（分母）
    total_count = len(df_recent)

    if total_count == 0:
        return np.nan
    
    # 5. finish_rank が 3 以下であるレコード数をカウント（分子）
    win_count = (df_recent['finish_rank'].astype(int) <= 3).sum()
    
    # 勝率を計算して返す
    return win_count / total_count

def predict_position_half_type(group):
    '''
    以下でソート
    1. 脚質順
    2. タイム指数順
    '''
    group = group.sort_values(["style_id", "time_index_avg_recent_2"], ascending=[True, False])
    n = len(group)
    mid = n // 2
    res = pd.Series(index=group.index, dtype=str)
    res.iloc[:mid] = "front"
    res.iloc[mid:] = "back"
    return res

def check_race_validity(group):
    # このレースの頭数を取得
    num_horses = group['num_horses'].iloc[0]
    # 設定辞書から必要数を取得（辞書にない頭数は「全頭揃っていること」を条件にする）
    required = MIN_RECORDS_MAP.get(num_horses, num_horses)
    # 有効な馬の数をカウント
    valid_count = group[['time_index_avg_recent_2', 'jockey_win_rate_recent']].notna().all(axis=1).sum()
    # print(num_horses, required, valid_count)
    return valid_count >= required

def get_recent_avg(group):
    # 有効なデータ（NaN以外）のみを抽出
    valid_data = group['time_index'].dropna()
    
    # 有効なデータに対して、直近2件の移動平均を計算
    # shift(1) することで「今回のレースを含まない過去2戦」にする
    avg_series = valid_data.shift(1).rolling(window=2, min_periods=2).mean()
    
    # 元のインデックス（NaNを含む全レコード）に合わせる
    return avg_series.reindex(group.index)

def get_jockey_win_rate(group):
    rate_series = group['is_top3'].shift(1).rolling(window=MIN_JOCKEY_RECORDS, min_periods=MIN_JOCKEY_RECORDS).mean()
    return rate_series

def predict_position_half_type(group):
    '''
    以下でソート
    1. 脚質順
    2. タイム指数順
    '''
    group = group.sort_values(["style_id", "time_index_avg_recent_2"], ascending=[True, False])
    n = len(group)
    mid = n // 2
    res = pd.Series(index=group.index, dtype=str)
    res.iloc[:mid] = "front"
    res.iloc[mid:] = "back"
    return res

In [73]:
# =================
# 学習データ読み込み
# =================
print("trainデータ読み込み：開始")
df_train_raw = pd.read_csv("C:\\keiba\\tool\\train\\train_raw.csv", header=0)
print("trainデータ読み込み：終了")

trainデータ読み込み：開始
trainデータ読み込み：終了


In [87]:
# =============
# 説明変数作成
# =============
print("説明変数作成：開始")
df_train = df_train_raw.copy()

### 予想タイム指数 ###
print("    予想タイム指数計算：開始")
# 直近過去2レースのタイム指数の平均値
df_train_sorted = df_train.sort_values(['horse_id', 'race_date', 'race_number'])
df_train_sorted['time_index_avg_recent_2'] = df_train_sorted.groupby('horse_id', group_keys=False).apply(get_recent_avg)
df_train_sorted = df_train_sorted.sort_values(['race_date', 'race_id'])
df_train = df_train_sorted.sort_index()
df_train['time_index_avg_recent_2_race_avg'] = df_train.groupby('race_id')['time_index_avg_recent_2'].transform('mean')
df_train['time_index_pred_from_race_avg'] = df_train['time_index_avg_recent_2'] - df_train['time_index_avg_recent_2_race_avg']
print("    予想タイム指数計算：終了")

### 騎手勝率 ###
print("    騎手複勝率算：開始")
df_train_sorted = df_train.sort_values(['jockey_id', 'race_date', 'race_number'])
df_train_sorted['is_top3'] = (pd.to_numeric(df_train_sorted['finish_rank'], errors='coerce') <= 3).astype(int)
df_train_sorted[f'jockey_top3_rate_recent_{MIN_JOCKEY_RECORDS}'] = df_train_sorted.groupby('jockey_id', group_keys=False).apply(get_jockey_win_rate)
df_train = df_train_sorted.drop(columns=['is_top3']).sort_index()
print("    騎手複勝率算：終了")

### コース×頭数×馬番別勝率
print("    コース×頭数×馬番別勝率計算：開始")
keys = ['racecourse', 'ground', 'distance', 'direction', 'num_horses', 'horse_number']
stats_df = df_train.groupby(keys)['finish_rank'].agg([
    ('win_count', lambda x: (pd.to_numeric(x, errors='coerce') == 1).sum()),
    ('total_count', 'count')
]).reset_index()
stats_df['condition_win_rate'] = stats_df['win_count'] / stats_df['total_count']
df_train = pd.merge(df_train, stats_df[keys + ['condition_win_rate']], on=keys, how='left')
print("    コース×頭数×馬番別勝率計算：終了")

### ポジションに紐づく勝率計算 ###
# trainデータに対するfront / backの割り当て
print("    ポジション（front / back）予想作成：開始")
# 方法(1) style_predとtime_index_avg_recent_2_race_avgからfront / backを予想
df_train["style_id"] = df_train["style_pred"]
temp_res = df_train.groupby("race_id", group_keys=False).apply(predict_position_half_type)
df_train["position_half_type_pred"] = temp_res.values
# 方法(2) trainデータ全体にわたり、各馬がfrontとbackどちらの傾向が強いかを平均値により判定
df_train['finish_rank_num'] = pd.to_numeric(df_train['finish_rank'], errors='coerce')
df_train['relative_rank'] = df_train['finish_rank_num'] / df_train['num_horses']
relative_rank_avg = df_train.groupby('horse_id')['relative_rank'].transform('mean')
df_train['position_half_type_attr'] = np.where(
    relative_rank_avg.isna(), 
    np.nan, 
    np.where(relative_rank_avg < 0.5, 'front', 'back')
)
# 予想ポジション別勝率
print("    予想ポジション別勝率計算：開始")
# trainデータから方法(1)(2)のいずれかによりコース×頭数×(front/back)ごとの勝率を計算
selected_position_half_type = 'position_half_type_pred' # 'position_half_type_pred'または'position_half_type_attr'
keys = ['racecourse', 'ground', 'distance', 'direction', 'num_horses', selected_position_half_type]
stats_df = df_train.groupby(keys)['finish_rank'].agg([
    ('win_count', lambda x: (pd.to_numeric(x, errors='coerce') == 1).sum()),
    ('total_count', 'count')
]).reset_index()
stats_df['position_half_type_win_rate'] = stats_df['win_count'] / stats_df['total_count']
# predデータのコース×頭数×(front/back)に紐づく勝率を割り当てる
df_train = pd.merge(
    df_train, 
    stats_df[keys + ['position_half_type_win_rate']], 
    on=keys, 
    how='left'
)
print("    予想ポジション別勝率計算：終了")
# 予想ポジション別比率勝率
print("    予想ポジション比率別勝率計算：開始")
df_train['front_count'] = df_train.groupby('race_id')[selected_position_half_type].transform(lambda x: (x == 'front').sum())
df_train['back_count'] = df_train.groupby('race_id')[selected_position_half_type].transform(lambda x: (x == 'back').sum())
keys = ['num_horses', 'front_count', 'back_count', selected_position_half_type]
stats_df = df_train.groupby(keys)['finish_rank'].agg(
    composition_pos_win_rate = lambda x: (pd.to_numeric(x, errors='coerce') == 1).sum() / len(x)
).reset_index()
df_train['front_count'] = df_train.groupby('race_id')[selected_position_half_type].transform(lambda x: (x == 'front').sum())
df_train['back_count'] = df_train.groupby('race_id')[selected_position_half_type].transform(lambda x: (x == 'back').sum())
df_train = pd.merge(df_train, stats_df, on=keys, how='left')
print("    予想ポジション比率別勝率計算：終了")
print("説明変数作成：終了")

説明変数作成：開始
    予想タイム指数計算：開始


  df_train_sorted['time_index_avg_recent_2'] = df_train_sorted.groupby('horse_id', group_keys=False).apply(get_recent_avg)


    予想タイム指数計算：終了
    騎手複勝率算：開始


  df_train_sorted[f'jockey_top3_rate_recent_{MIN_JOCKEY_RECORDS}'] = df_train_sorted.groupby('jockey_id', group_keys=False).apply(get_jockey_win_rate)


    騎手複勝率算：終了
    コース×頭数×馬番別勝率計算：開始
    コース×頭数×馬番別勝率計算：終了
    ポジション（front / back）予想作成：開始


  temp_res = df_train.groupby("race_id", group_keys=False).apply(predict_position_half_type)


    予想ポジション別勝率計算：開始
    予想ポジション別勝率計算：終了
    予想ポジション比率別勝率計算：開始
    予想ポジション比率別勝率計算：終了
説明変数作成：終了


In [95]:
# =============
# 目的変数作成
# =============
# 相対順位の計算
df_train["relative_rank"] = df_train['finish_rank'] / df_train["num_horses"]
# 相対順位→0/1 フラグの作成
df_train['relative_rank_bin'] = np.where(
    df_train['relative_rank'].isna(),
    np.nan,
    (df_train['relative_rank'] <= 0.5).astype(float)
)

In [97]:
# ===============
# CSVファイル出力
# ===============
df_train.to_csv("C:\\keiba\\tool\\train\\train.csv", index=False, encoding="cp932")