In [8]:
from pathlib import Path
import numpy as np
import pandas as pd
import math
import csv
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import MinMaxScaler, LabelEncoder
from sklearn.metrics import roc_auc_score
import joblib
import os
from datetime import datetime

### 特徵工程

In [None]:
def FFT(xreal, ximag):    
    n = 2
    while(n*2 <= len(xreal)):
        n *= 2
    
    p = int(math.log(n, 2))
    
    for i in range(0, n):
        a = i
        b = 0
        for j in range(0, p):
            b = int(b*2 + a%2)
            a = a/2
        if(b > i):
            xreal[i], xreal[b] = xreal[b], xreal[i]
            ximag[i], ximag[b] = ximag[b], ximag[i]
            
    wreal = []
    wimag = []
        
    arg = float(-2 * math.pi / n)
    treal = float(math.cos(arg))
    timag = float(math.sin(arg))
    
    wreal.append(float(1.0))
    wimag.append(float(0.0))
    
    for j in range(1, int(n/2)):
        wreal.append(wreal[-1] * treal - wimag[-1] * timag)
        wimag.append(wreal[-1] * timag + wimag[-1] * treal)
        
    m = 2
    while(m < n + 1):
        for k in range(0, n, m):
            for j in range(0, int(m/2), 1):
                index1 = k + j
                index2 = int(index1 + m / 2)
                t = int(n * j / m)
                treal = wreal[t] * xreal[index2] - wimag[t] * ximag[index2]
                timag = wreal[t] * ximag[index2] + wimag[t] * xreal[index2]
                ureal = xreal[index1]
                uimag = ximag[index1]
                xreal[index1] = ureal + treal
                ximag[index1] = uimag + timag
                xreal[index2] = ureal - treal
                ximag[index2] = uimag - timag
        m *= 2
        
    return n, xreal, ximag   
    
def FFT_data(input_data, swinging_times):   
    txtlength = swinging_times[-1] - swinging_times[0]
    a_mean = [0] * txtlength
    g_mean = [0] * txtlength
       
    for num in range(len(swinging_times)-1):
        a = []
        g = []
        for swing in range(swinging_times[num], swinging_times[num+1]):
            a.append(math.sqrt(math.pow((input_data[swing][0] + input_data[swing][1] + input_data[swing][2]), 2)))
            g.append(math.sqrt(math.pow((input_data[swing][3] + input_data[swing][4] + input_data[swing][5]), 2)))

        a_mean[num] = (sum(a) / len(a))
        """  Modify: 將sum(a)/len(g)改為g，但結果好像沒有比較好，要再觀察  """
        g_mean[num] = (sum(g) / len(g))
    
    return a_mean, g_mean

def feature(input_data, swinging_now, swinging_times, n_fft, a_fft, g_fft, a_fft_imag, g_fft_imag, writer):
    allsum = []
    mean = []
    var = []
    rms = []
    XYZmean_a = 0
    a = []
    g = []
    a_s1 = 0
    a_s2 = 0
    g_s1 = 0
    g_s2 = 0
    a_k1 = 0
    a_k2 = 0
    g_k1 = 0
    g_k2 = 0
    
    for i in range(len(input_data)):
        if i==0:
            allsum = input_data[i]
            a.append(math.sqrt(math.pow((input_data[i][0] + input_data[i][1] + input_data[i][2]), 2)))
            g.append(math.sqrt(math.pow((input_data[i][3] + input_data[i][4] + input_data[i][5]), 2)))
            continue
        
        a.append(math.sqrt(math.pow((input_data[i][0] + input_data[i][1] + input_data[i][2]), 2)))
        g.append(math.sqrt(math.pow((input_data[i][3] + input_data[i][4] + input_data[i][5]), 2)))
       
        allsum = [allsum[feature_index] + input_data[i][feature_index] for feature_index in range(len(input_data[i]))]
        
    mean = [allsum[feature_index] / len(input_data) for feature_index in range(len(input_data[i]))]
    
    """  Modify: 計算variance時負數開根號的問題  """
    var = [0] * len(input_data[0])
    rms = [0] * len(input_data[0])

    # 遍歷每一筆感測資料
    for i in range(len(input_data)):
        for j in range(len(input_data[i])):
            var[j] += (input_data[i][j] - mean[j]) ** 2
            rms[j] += input_data[i][j] ** 2
        
    var = [math.sqrt((var[feature_index] / len(input_data))) for feature_index in range(len(input_data[i]))]
    rms = [math.sqrt((rms[feature_index] / len(input_data))) for feature_index in range(len(input_data[i]))]
    
    a_max = [max(a)]
    a_min = [min(a)]
    a_mean = [sum(a) / len(a)]
    g_max = [max(g)]
    g_min = [min(g)]
    g_mean = [sum(g) / len(g)]
    
    a_var = math.sqrt(math.pow((var[0] + var[1] + var[2]), 2))
    
    for i in range(len(input_data)):
        a_s1 = a_s1 + math.pow((a[i] - a_mean[0]), 4)
        a_s2 = a_s2 + math.pow((a[i] - a_mean[0]), 2)
        g_s1 = g_s1 + math.pow((g[i] - g_mean[0]), 4)
        g_s2 = g_s2 + math.pow((g[i] - g_mean[0]), 2)
        a_k1 = a_k1 + math.pow((a[i] - a_mean[0]), 3)
        g_k1 = g_k1 + math.pow((g[i] - g_mean[0]), 3)
    
    a_s1 = a_s1 / len(input_data)
    a_s2 = a_s2 / len(input_data)
    g_s1 = g_s1 / len(input_data)
    g_s2 = g_s2 / len(input_data)
    a_k2 = math.pow(a_s2, 1.5)
    g_k2 = math.pow(g_s2, 1.5)
    a_s2 = a_s2 * a_s2
    g_s2 = g_s2 * g_s2
    
    a_kurtosis = [a_s1 / a_s2]
    g_kurtosis = [g_s1 / g_s2]
    a_skewness = [a_k1 / a_k2]
    g_skewness = [g_k1 / g_k2]
    
    a_fft_mean = 0
    g_fft_mean = 0
    cut = int(n_fft / swinging_times)
    a_psd = []
    g_psd = []
    entropy_a = []
    entropy_g = []
    e1 = []
    e3 = []
    e2 = 0
    e4 = 0
    
    for i in range(cut * swinging_now, cut * (swinging_now + 1)):
        a_fft_mean += a_fft[i]
        g_fft_mean += g_fft[i]
        a_psd.append(math.pow(a_fft[i], 2) + math.pow(a_fft_imag[i], 2))
        g_psd.append(math.pow(g_fft[i], 2) + math.pow(g_fft_imag[i], 2))
        e1.append(math.pow(a_psd[-1], 0.5))
        e3.append(math.pow(g_psd[-1], 0.5))
        
    a_fft_mean = a_fft_mean / cut
    g_fft_mean = g_fft_mean / cut
    
    a_psd_mean = sum(a_psd) / len(a_psd)
    g_psd_mean = sum(g_psd) / len(g_psd)
    
    for i in range(cut):
        e2 += math.pow(a_psd[i], 0.5)
        e4 += math.pow(g_psd[i], 0.5)
    
    for i in range(cut):
        entropy_a.append((e1[i] / e2) * math.log(e1[i] / e2))
        entropy_g.append((e3[i] / e4) * math.log(e3[i] / e4))
    
    a_entropy_mean = sum(entropy_a) / len(entropy_a)
    g_entropy_mean = sum(entropy_g) / len(entropy_g)       
        
    
    output = mean + var + rms + a_max + a_mean + a_min + g_max + g_mean + g_min + [a_fft_mean] + [g_fft_mean] + [a_psd_mean] + [g_psd_mean] + a_kurtosis + g_kurtosis + a_skewness + g_skewness + [a_entropy_mean] + [g_entropy_mean]
    writer.writerow(output)

def data_generate(datapath: str, tar_dir: str):
    # datapath = './dataset/39_Test_Dataset/test_data'
    # tar_dir = './dataset/39_Test_Dataset/tabular_data_test'
    pathlist_txt = Path(datapath).glob('**/*.txt')
    os.makedirs(tar_dir, exist_ok=True)
    
    for file in pathlist_txt:
        f = open(file)

        All_data = []

        count = 0
        for line in f.readlines():
            if line == '\n' or count == 0:
                count += 1
                continue
            num = line.split(' ')
            if len(num) > 5:
                tmp_list = []
                for i in range(6):
                    tmp_list.append(int(num[i]))
                All_data.append(tmp_list)
        
        f.close()

        swing_index = np.linspace(0, len(All_data), 28, dtype = int)
        # filename.append(int(Path(file).stem))
        # all_swing.append([swing_index])

        headerList = ['ax_mean', 'ay_mean', 'az_mean', 'gx_mean', 'gy_mean', 'gz_mean', 'ax_var', 'ay_var', 'az_var', 'gx_var', 'gy_var', 'gz_var', 'ax_rms', 'ay_rms', 'az_rms', 'gx_rms', 'gy_rms', 'gz_rms', 'a_max', 'a_mean', 'a_min', 'g_max', 'g_mean', 'g_min', 'a_fft', 'g_fft', 'a_psd', 'g_psd', 'a_kurt', 'g_kurt', 'a_skewn', 'g_skewn', 'a_entropy', 'g_entropy']                
        

        output_path = Path(tar_dir) / f"{Path(file).stem}.csv"
        with open(output_path, 'w', newline='') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(headerList)

            try:
                a_fft, g_fft = FFT_data(All_data, swing_index)
                a_fft_imag = [0] * len(a_fft)
                g_fft_imag = [0] * len(g_fft)
                n_fft, a_fft, a_fft_imag = FFT(a_fft, a_fft_imag)
                n_fft, g_fft, g_fft_imag = FFT(g_fft, g_fft_imag)
                for i in range(len(swing_index)):
                    if i==0:
                        continue
                    feature(All_data[swing_index[i-1]: swing_index[i]], i - 1, len(swing_index) - 1, n_fft, a_fft, g_fft, a_fft_imag, g_fft_imag, writer)
            except:
                print(Path(file).stem)
                continue


In [9]:
datapath = '../dataset/39_Test_Dataset/test_data'
tar_dir = '../dataset/39_Test_Dataset/tabular_data_test'
data_generate(datapath, tar_dir)
datapath = '../dataset/39_Training_Dataset/train_data'
tar_dir = '../dataset/39_Training_Dataset/tabular_data_train'
data_generate(datapath, tar_dir)

NameError: name 'data_generate' is not defined

### Training

In [10]:
def train(train_info_path: str, datapath: str):
    # 讀取訓練資訊，根據 player_id 將資料分成 80% 訓練、20% 測試
    info = pd.read_csv(train_info_path)
    unique_players = info['player_id'].unique()
    train_players, test_players = train_test_split(unique_players, test_size=0.2, random_state=42)
    
    # 讀取特徵 CSV 檔（位於 "./tabular_data_train"）
    datalist = list(Path(datapath).glob('**/*.csv'))
    target_mask = ['gender', 'hold racket handed', 'play years', 'level']
    
    # 根據 test_players 分組資料
    x_train = pd.DataFrame()
    y_train = pd.DataFrame(columns=target_mask)
    x_test = pd.DataFrame()
    y_test = pd.DataFrame(columns=target_mask)
    
    for file in datalist:
        # 取得檔案名
        unique_id = int(Path(file).stem)
        row = info[info['unique_id'] == unique_id]
        if row.empty:
            continue
        # 取的該列資料
        player_id = row['player_id'].iloc[0]
        data = pd.read_csv(file)
        # lable資料*27
        target = row[target_mask]
        target_repeated = pd.concat([target] * len(data))
        if player_id in train_players:
            x_train = pd.concat([x_train, data], ignore_index=True)
            y_train = pd.concat([y_train, target_repeated], ignore_index=True)
        elif player_id in test_players:
            x_test = pd.concat([x_test, data], ignore_index=True)
            y_test = pd.concat([y_test, target_repeated], ignore_index=True)
    
    # 標準化特徵
    scaler = MinMaxScaler() # 將特徵縮放到 [0, 1] 範圍
    le = LabelEncoder()
    
    X_train_scaled = scaler.fit_transform(x_train)
    X_test_scaled = scaler.transform(x_test)
    
    group_size = 27

    def model_binary(X_train, y_train, X_test, y_test):
        clf = RandomForestClassifier(random_state=42)
        clf.fit(X_train, y_train)
        
        predicted = clf.predict_proba(X_test)
        # 取出正類（index 0）的概率
        predicted = [predicted[i][0] for i in range(len(predicted))]
        
        
        num_groups = len(predicted) // group_size 
        if sum(predicted[:group_size]) / group_size > 0.5:
            y_pred = [max(predicted[i*group_size: (i+1)*group_size]) for i in range(num_groups)]
        else:
            y_pred = [min(predicted[i*group_size: (i+1)*group_size]) for i in range(num_groups)]
        
        y_pred  = [1 - x for x in y_pred]
        y_test_agg = [y_test[i*group_size] for i in range(num_groups)]
        
        auc_score = roc_auc_score(y_test_agg, y_pred, average='micro')
        print(auc_score)

        return clf

    # 定義多類別分類評分函數 (例如 play years、level)
    def model_multiary(X_train, y_train, X_test, y_test):
        clf = RandomForestClassifier(random_state=42)
        clf.fit(X_train, y_train)
        predicted = clf.predict_proba(X_test)
        num_groups = len(predicted) // group_size
        y_pred = []
        for i in range(num_groups):
            group_pred = predicted[i*group_size: (i+1)*group_size]
            num_classes = len(np.unique(y_train))
            # 對每個類別計算該組內的總機率
            class_sums = [sum([group_pred[k][j] for k in range(group_size)]) for j in range(num_classes)]
            chosen_class = np.argmax(class_sums)
            candidate_probs = [group_pred[k][chosen_class] for k in range(group_size)]
            best_instance = np.argmax(candidate_probs)
            y_pred.append(group_pred[best_instance])
        
        y_test_agg = [y_test[i*group_size] for i in range(num_groups)]
        auc_score = roc_auc_score(y_test_agg, y_pred, average='micro', multi_class='ovr')
        print('Multiary AUC:', auc_score)

        return clf

    # 評分：針對各目標進行模型訓練與評分
    y_train_le_gender = le.fit_transform(y_train['gender'])
    y_test_le_gender = le.transform(y_test['gender'])
    clf_gender = model_binary(X_train_scaled, y_train_le_gender, X_test_scaled, y_test_le_gender)
    
    y_train_le_hold = le.fit_transform(y_train['hold racket handed'])
    y_test_le_hold = le.transform(y_test['hold racket handed'])
    clf_hold = model_binary(X_train_scaled, y_train_le_hold, X_test_scaled, y_test_le_hold)
    
    y_train_le_years = le.fit_transform(y_train['play years'])
    y_test_le_years = le.transform(y_test['play years'])
    clf_years = model_multiary(X_train_scaled, y_train_le_years, X_test_scaled, y_test_le_years)
    
    y_train_le_level = le.fit_transform(y_train['level'])
    y_test_le_level = le.transform(y_test['level'])
    clf_level = model_multiary(X_train_scaled, y_train_le_level, X_test_scaled, y_test_le_level)

    os.makedirs("models", exist_ok=True)
    joblib.dump(scaler, "models/scaler.pkl")
    joblib.dump(clf_gender, "models/gender_model.pkl")
    joblib.dump(clf_hold,   "models/hold_model.pkl")
    joblib.dump(clf_years,  "models/years_model.pkl")
    joblib.dump(clf_level,  "models/level_model.pkl")

In [11]:
train_info_path = '../dataset/39_Training_Dataset/train_info.csv'
datapath = '../dataset/39_Training_Dataset/tabular_data_train'
train(train_info_path, datapath)

0.8116557137641476
0.9991449783116451
Multiary AUC: 0.6654614151157799
Multiary AUC: 0.8229106167352678


### inference

In [12]:
def inference(model_dir: str, test_data_dir: str, submission_path: str):
    model_dir = Path(model_dir)
    test_data_dir = Path(test_data_dir)

    Path(submission_path).parent.mkdir(parents=True, exist_ok=True)
    
    # 收集所有測試檔案（每個選手一份）
    datalist = list(test_data_dir.glob('**/*.csv'))
    unique_ids = [int(f.stem) for f in datalist]


    # 載入所有測試資料
    x_test = pd.DataFrame()
    unique_ids = []
    group_size = 27
    for file in datalist:
        df = pd.read_csv(file)
        x_test = pd.concat([x_test, df], ignore_index=True)
        n_groups = len(df) // group_size
        unique_ids.extend([int(file.stem)] * n_groups)

    scaler = joblib.load(model_dir / "scaler.pkl")
    X_test_scaled = scaler.transform(x_test)
    num_groups = len(X_test_scaled) // group_size

    # 每位選手 27 筆資料
    group_size = 27
    num_groups = len(X_test_scaled) // group_size

    # 載入模型
    model_gender = joblib.load(model_dir / "gender_model.pkl")
    model_hold = joblib.load(model_dir / "hold_model.pkl")
    model_years = joblib.load(model_dir / "years_model.pkl")
    model_level = joblib.load(model_dir / "level_model.pkl")

    # 預測 group-wise 機率平均
    def predict_groupwise_proba(model, X):
        pred_proba = model.predict_proba(X)
        results = []
        for i in range(num_groups):
            group = pred_proba[i*group_size:(i+1)*group_size]
            avg_proba = np.mean(group, axis=0)
            results.append(avg_proba)
        return np.array(results)
    
    def predict_binary_groupwise(model, X):
        pred_proba = model.predict_proba(X)
        results = []
        for i in range(num_groups):
            group = pred_proba[i*group_size:(i+1)*group_size, 0]  # 類別 0 概率（male 或 right）
            if np.mean(group) > 0.5:
                result = max(group)  # 選擇最大概率
            else:
                result = min(group)  # 選擇最小概率
            results.append([result, 1 - result])  # [P(class_0), P(class_1)]
        return np.array(results)
    
    def predict_multiary_groupwise(model, X):
        pred_proba = model.predict_proba(X)
        results = []
        for i in range(num_groups):
            group = pred_proba[i*group_size:(i+1)*group_size]
            class_sums = np.sum(group, axis=0)
            chosen_class = np.argmax(class_sums)
            candidate_probs = group[:, chosen_class]
            best_instance = np.argmax(candidate_probs)
            results.append(group[best_instance])
        return np.array(results)

    # 執行四個任務預測
    gender_probs = predict_groupwise_proba(model_gender, X_test_scaled)    
    # print(gender_probs)
    # print(predict_binary_groupwise(model_gender, X_test_scaled))
    hold_probs   = predict_groupwise_proba(model_hold, X_test_scaled)

    years_probs  = predict_multiary_groupwise(model_years, X_test_scaled)
    level_probs  = predict_multiary_groupwise(model_level, X_test_scaled)

    # 建立 submission dataframe
    submission = pd.DataFrame({
        'unique_id': unique_ids,
        'gender': np.round(gender_probs[:, 0], 6),
        'hold racket handed': np.round(hold_probs[:, 0], 6),
        'play years_0': np.round(years_probs[:, 0], 6),
        'play years_1': np.round(years_probs[:, 1], 6),
        'play years_2': np.round(years_probs[:, 2], 6),
        'level_2': np.round(level_probs[:, 0], 6),
        'level_3': np.round(level_probs[:, 1], 6),
        'level_4': np.round(level_probs[:, 2], 6),
        'level_5': np.round(level_probs[:, 3], 6)
    })
    submission = submission.sort_values(by="unique_id").reset_index(drop=True)
    submission.to_csv(submission_path, index=False, encoding='utf-8', lineterminator='\n')
    print(f"Submission saved to {submission_path}")

In [13]:
today = datetime.now()
model_dir = "models"
test_data_dir = "../dataset/39_Test_Dataset/tabular_data_test"
submission_path = f"./result/submission{today.strftime('%m%d')}.csv"
inference(model_dir, test_data_dir, submission_path)

Submission saved to ./result/submission0426.csv
