In [34]:
import os
import re
import numpy as np
import pandas as pd
from pathlib import Path
from tqdm import tqdm
from sklearn.preprocessing import StandardScaler
import joblib

# === 設定資料夾 ===
ROOT = Path('./AICUP')
TXT_DIR = ROOT / 'train_data'              # 或 'test_data'
INFO_CSV = ROOT / 'train_info.csv'         # 或 'test_info.csv'
SAVE_DIR = ROOT / 'sequence_data_train'    # 或 'sequence_data_test'
SAVE_DIR.mkdir(parents=True, exist_ok=True)

# === 載入 info.csv 並處理 cut_point ===
info = pd.read_csv(INFO_CSV)

def fix_cut_point_format(x):
    if pd.isna(x):
        return []
    try:
        return [int(n) for n in re.findall(r'\d+', str(x))]
    except:
        return []

info['cut_point'] = info['cut_point'].apply(fix_cut_point_format)
info = info[info['cut_point'].apply(lambda x: len(x) >= 2)]  # 至少能切出 1 段

# === 讀取 txt 成 numpy array ===
def read_txt_as_array(txt_path):
    lines = Path(txt_path).read_text().splitlines()
    data = []
    for line in lines:
        parts = line.strip().split()
        if len(parts) == 6:
            try:
                data.append([int(x) for x in parts])
            except:
                continue
    return np.array(data)

# === 收集所有段落，後續統一建立 scaler ===
all_segments = []
meta = []

# === 切割每個 txt 檔案 ===
for _, row in tqdm(info.iterrows(), total=len(info)):
    uid = row['unique_id']
    cut_points = row['cut_point']
    txt_file = TXT_DIR / f"{uid}.txt"

    if not txt_file.exists():
        continue

    raw = read_txt_as_array(txt_file)

    for i in range(len(cut_points) - 1):
        start, end = cut_points[i], cut_points[i + 1]
        segment = raw[start:end]

        if segment.shape[0] == 0:
            continue

        # 補 0 到 (85, 6)
        if segment.shape[0] < 85:
            pad_len = 85 - segment.shape[0]
            pad = np.zeros((pad_len, 6))
            segment = np.vstack([segment, pad])
        elif segment.shape[0] > 85:
            segment = segment[:85]

        if segment.shape != (85, 6):
            continue

        all_segments.append(segment)
        meta.append((uid, i))

# === 統一建 scaler，fit 所有 segment 後儲存 ===
print(f'Number of segments collected: {len(all_segments)}')
all_data = np.concatenate(all_segments, axis=0)  # (N*85, 6)
scaler = StandardScaler()
scaler.fit(all_data)
joblib.dump(scaler, ROOT / "scaler.pkl")
print("✅ Scaler 已儲存到：", ROOT / "scaler.pkl")

# === 正規化後再儲存每筆 segment ===
for segment, (uid, i) in tqdm(zip(all_segments, meta), total=len(all_segments)):
    normed = scaler.transform(segment)
    np.save(SAVE_DIR / f"{uid}_{i}.npy", normed)

print(f"✅ 共儲存 {len(all_segments)} 筆正規化後的 (85,6) .npy 至：{SAVE_DIR}")


100%|██████████| 5/5 [00:00<00:00, 26.64it/s]


Number of segments collected: 135
✅ Scaler 已儲存到： AICUP\scaler.pkl


100%|██████████| 135/135 [00:01<00:00, 79.75it/s]

✅ 共儲存 135 筆正規化後的 (85,6) .npy 至：AICUP\sequence_data_train





In [35]:
import os
import re
import numpy as np
import pandas as pd
from pathlib import Path
from tqdm import tqdm
import joblib

# === 路徑設定 ===
ROOT = Path('./AICUP')
TXT_DIR = ROOT / 'test_data'
INFO_CSV = ROOT / 'test_info.csv'
SAVE_DIR = ROOT / 'sequence_data_test'
SCALER_PATH = ROOT / 'scaler.pkl'
SAVE_DIR.mkdir(parents=True, exist_ok=True)

# === 載入 cut_point ===
info = pd.read_csv(INFO_CSV)

def fix_cut_point_format(x):
    if pd.isna(x):
        return []
    return [int(n) for n in re.findall(r'\d+', str(x))]

info['cut_point'] = info['cut_point'].apply(fix_cut_point_format)
info = info[info['cut_point'].apply(lambda x: len(x) >= 2)]

# === 載入訓練用的 StandardScaler ===
scaler = joblib.load(SCALER_PATH)

# === txt to array ===
def read_txt_as_array(txt_path):
    lines = Path(txt_path).read_text().splitlines()
    data = []
    for line in lines:
        parts = line.strip().split()
        if len(parts) == 6:
            try:
                data.append([int(x) for x in parts])
            except:
                continue
    return np.array(data)

# === 開始處理每個檔案 ===
count = 0
for _, row in tqdm(info.iterrows(), total=len(info), desc="📂 處理測試檔案"):
    uid = row['unique_id']
    cut_points = row['cut_point']
    uid_int = int(float(uid))
    txt_file = TXT_DIR / f"{uid_int}.txt"
    
    if not txt_file.exists():
        print(f"[❌ 檔案不存在] {txt_file}")
        continue
    else:
        print(f"[✅ 檔案找到] {txt_file}")


    if not txt_file.exists():
        continue

    raw = read_txt_as_array(txt_file)

    for i in range(len(cut_points) - 1):
        start, end = cut_points[i], cut_points[i + 1]
        segment = raw[start:end]

        # 補 0 到 (85, 6)
        if segment.shape[0] < 85:
            pad = np.zeros((85 - segment.shape[0], 6))
            segment = np.vstack([segment, pad])
        elif segment.shape[0] > 85:
            segment = segment[:85]

        if segment.shape != (85, 6):
            continue

        # 使用 scaler 做 Z-score 正規化
        normed = scaler.transform(segment)

        # 儲存為 npy
        np.save(SAVE_DIR / f"{uid}_{i}.npy", normed)
        count += 1

print(f"\n✅ 共儲存 {count} 筆測試資料到：{SAVE_DIR}")


📂 處理測試檔案:   0%|          | 0/5 [00:00<?, ?it/s]

[✅ 檔案找到] AICUP\test_data\1968.txt


📂 處理測試檔案:  20%|██        | 1/5 [00:00<00:01,  3.12it/s]

[✅ 檔案找到] AICUP\test_data\1969.txt


📂 處理測試檔案:  40%|████      | 2/5 [00:00<00:01,  2.99it/s]

[✅ 檔案找到] AICUP\test_data\1970.txt


📂 處理測試檔案:  60%|██████    | 3/5 [00:01<00:01,  1.31it/s]

[✅ 檔案找到] AICUP\test_data\1971.txt


📂 處理測試檔案:  80%|████████  | 4/5 [00:03<00:00,  1.01it/s]

[✅ 檔案找到] AICUP\test_data\1972.txt


📂 處理測試檔案: 100%|██████████| 5/5 [00:04<00:00,  1.09it/s]


✅ 共儲存 135 筆測試資料到：AICUP\sequence_data_test





In [2]:
import os
import numpy as np
import pandas as pd
from pathlib import Path
from sklearn.model_selection import GroupShuffleSplit
from sklearn.preprocessing import LabelEncoder
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv1D, MaxPooling1D, Bidirectional, LSTM, Dense, Dropout, BatchNormalization, GlobalAveragePooling1D

# === 資料夾與標籤檔案路徑 ===
DATA_DIR = Path("./AICUP/sequence_data_train")
INFO_CSV = Path("./AICUP/train_info.csv")

# === 讀取 train_info.csv ===
info = pd.read_csv(INFO_CSV)

# === 收集每筆 npy 資料與標籤 ===
from tqdm import tqdm

X, y_gender, y_handed, y_years, y_level, groups = [], [], [], [], [], []

for i, row in tqdm(info.iterrows(), total=len(info), desc="讀取 npy"):
    uid = row['unique_id']
    pid = row['player_id']
    for seg_id in range(27):
        npy_path = DATA_DIR / f"{uid}_{seg_id}.npy"
        if not npy_path.exists():
            continue
        data = np.load(npy_path)
        if data.shape != (85, 6):
            continue
        X.append(data)
        y_gender.append(row['gender'])
        y_handed.append(row['hold racket handed'])
        y_years.append(row['play years'])
        y_level.append(row['level'])
        groups.append(pid)


X = np.array(X)

# === 標籤編碼 ===
le_gender = LabelEncoder(); y_gender = le_gender.fit_transform(y_gender)
le_handed = LabelEncoder(); y_handed = le_handed.fit_transform(y_handed)
le_years = LabelEncoder(); y_years = le_years.fit_transform(y_years)
le_level = LabelEncoder(); y_level = le_level.fit_transform(y_level)

# === 分訓練/驗證集（依 player_id）===
gss = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
train_idx, val_idx = next(gss.split(X, groups=groups))

X_train, X_val = X[train_idx], X[val_idx]
y_gender_train, y_gender_val = y_gender[train_idx], y_gender[val_idx]
y_handed_train, y_handed_val = y_handed[train_idx], y_handed[val_idx]
y_years_train, y_years_val = y_years[train_idx], y_years[val_idx]
y_level_train, y_level_val = y_level[train_idx], y_level[val_idx]


讀取 npy: 100%|██████████| 5/5 [00:03<00:00,  1.49it/s]


In [6]:
from pathlib import Path
import numpy as np
import pandas as pd
import math
import csv
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import MinMaxScaler, LabelEncoder
from sklearn.metrics import roc_auc_score

def FFT(xreal, ximag):    
    n = 2
    while(n*2 <= len(xreal)):
        n *= 2
    
    p = int(math.log(n, 2))
    
    for i in range(0, n):
        a = i
        b = 0
        for j in range(0, p):
            b = int(b*2 + a%2)
            a = a/2
        if(b > i):
            xreal[i], xreal[b] = xreal[b], xreal[i]
            ximag[i], ximag[b] = ximag[b], ximag[i]
            
    wreal = []
    wimag = []
        
    arg = float(-2 * math.pi / n)
    treal = float(math.cos(arg))
    timag = float(math.sin(arg))
    
    wreal.append(float(1.0))
    wimag.append(float(0.0))
    
    for j in range(1, int(n/2)):
        wreal.append(wreal[-1] * treal - wimag[-1] * timag)
        wimag.append(wreal[-1] * timag + wimag[-1] * treal)
        
    m = 2
    while(m < n + 1):
        for k in range(0, n, m):
            for j in range(0, int(m/2), 1):
                index1 = k + j
                index2 = int(index1 + m / 2)
                t = int(n * j / m)
                treal = wreal[t] * xreal[index2] - wimag[t] * ximag[index2]
                timag = wreal[t] * ximag[index2] + wimag[t] * xreal[index2]
                ureal = xreal[index1]
                uimag = ximag[index1]
                xreal[index1] = ureal + treal
                ximag[index1] = uimag + timag
                xreal[index2] = ureal - treal
                ximag[index2] = uimag - timag
        m *= 2
        
    return n, xreal, ximag   
    
def FFT_data(input_data, swinging_times):   
    txtlength = swinging_times[-1] - swinging_times[0]
    a_mean = [0] * txtlength
    g_mean = [0] * txtlength
       
    for num in range(len(swinging_times)-1):
        a = []
        g = []
        for swing in range(swinging_times[num], swinging_times[num+1]):
            a.append(math.sqrt(math.pow((input_data[swing][0] + input_data[swing][1] + input_data[swing][2]), 2)))
            g.append(math.sqrt(math.pow((input_data[swing][3] + input_data[swing][4] + input_data[swing][5]), 2)))

        a_mean[num] = (sum(a) / len(a))
        g_mean[num] = (sum(a) / len(a))
    
    return a_mean, g_mean

def feature(input_data, swinging_now, swinging_times, n_fft, a_fft, g_fft, a_fft_imag, g_fft_imag, writer):
    allsum = []
    mean = []
    var = []
    rms = []
    XYZmean_a = 0
    a = []
    g = []
    a_s1 = 0
    a_s2 = 0
    g_s1 = 0
    g_s2 = 0
    a_k1 = 0
    a_k2 = 0
    g_k1 = 0
    g_k2 = 0
    
    for i in range(len(input_data)):
        if i==0:
            allsum = input_data[i]
            a.append(math.sqrt(math.pow((input_data[i][0] + input_data[i][1] + input_data[i][2]), 2)))
            g.append(math.sqrt(math.pow((input_data[i][3] + input_data[i][4] + input_data[i][5]), 2)))
            continue
        
        a.append(math.sqrt(math.pow((input_data[i][0] + input_data[i][1] + input_data[i][2]), 2)))
        g.append(math.sqrt(math.pow((input_data[i][3] + input_data[i][4] + input_data[i][5]), 2)))
       
        allsum = [allsum[feature_index] + input_data[i][feature_index] for feature_index in range(len(input_data[i]))]
        
    mean = [allsum[feature_index] / len(input_data) for feature_index in range(len(input_data[i]))]
    
    for i in range(len(input_data)):
        if i==0:
            var = input_data[i]
            rms = input_data[i]
            continue

        var = [var[feature_index] + math.pow((input_data[i][feature_index] - mean[feature_index]), 2) for feature_index in range(len(input_data[i]))]
        rms = [rms[feature_index] + math.pow(input_data[i][feature_index], 2) for feature_index in range(len(input_data[i]))]
        
    var = [math.sqrt((var[feature_index] / len(input_data))) for feature_index in range(len(input_data[i]))]
    rms = [math.sqrt((rms[feature_index] / len(input_data))) for feature_index in range(len(input_data[i]))]
    
    a_max = [max(a)]
    a_min = [min(a)]
    a_mean = [sum(a) / len(a)]
    g_max = [max(g)]
    g_min = [min(g)]
    g_mean = [sum(g) / len(g)]
    
    a_var = math.sqrt(math.pow((var[0] + var[1] + var[2]), 2))
    
    for i in range(len(input_data)):
        a_s1 = a_s1 + math.pow((a[i] - a_mean[0]), 4)
        a_s2 = a_s2 + math.pow((a[i] - a_mean[0]), 2)
        g_s1 = g_s1 + math.pow((g[i] - g_mean[0]), 4)
        g_s2 = g_s2 + math.pow((g[i] - g_mean[0]), 2)
        a_k1 = a_k1 + math.pow((a[i] - a_mean[0]), 3)
        g_k1 = g_k1 + math.pow((g[i] - g_mean[0]), 3)
    
    a_s1 = a_s1 / len(input_data)
    a_s2 = a_s2 / len(input_data)
    g_s1 = g_s1 / len(input_data)
    g_s2 = g_s2 / len(input_data)
    a_k2 = math.pow(a_s2, 1.5)
    g_k2 = math.pow(g_s2, 1.5)
    a_s2 = a_s2 * a_s2
    g_s2 = g_s2 * g_s2
    
    a_kurtosis = [a_s1 / a_s2]
    g_kurtosis = [g_s1 / g_s2]
    a_skewness = [a_k1 / a_k2]
    g_skewness = [g_k1 / g_k2]
    
    a_fft_mean = 0
    g_fft_mean = 0
    cut = int(n_fft / swinging_times)
    a_psd = []
    g_psd = []
    entropy_a = []
    entropy_g = []
    e1 = []
    e3 = []
    e2 = 0
    e4 = 0
    
    for i in range(cut * swinging_now, cut * (swinging_now + 1)):
        a_fft_mean += a_fft[i]
        g_fft_mean += g_fft[i]
        a_psd.append(math.pow(a_fft[i], 2) + math.pow(a_fft_imag[i], 2))
        g_psd.append(math.pow(g_fft[i], 2) + math.pow(g_fft_imag[i], 2))
        e1.append(math.pow(a_psd[-1], 0.5))
        e3.append(math.pow(g_psd[-1], 0.5))
        
    a_fft_mean = a_fft_mean / cut
    g_fft_mean = g_fft_mean / cut
    
    a_psd_mean = sum(a_psd) / len(a_psd)
    g_psd_mean = sum(g_psd) / len(g_psd)
    
    for i in range(cut):
        e2 += math.pow(a_psd[i], 0.5)
        e4 += math.pow(g_psd[i], 0.5)
    
    for i in range(cut):
        entropy_a.append((e1[i] / e2) * math.log(e1[i] / e2))
        entropy_g.append((e3[i] / e4) * math.log(e3[i] / e4))
    
    a_entropy_mean = sum(entropy_a) / len(entropy_a)
    g_entropy_mean = sum(entropy_g) / len(entropy_g)       
        
    
    output = mean + var + rms + a_max + a_mean + a_min + g_max + g_mean + g_min + [a_fft_mean] + [g_fft_mean] + [a_psd_mean] + [g_psd_mean] + a_kurtosis + g_kurtosis + a_skewness + g_skewness + [a_entropy_mean] + [g_entropy_mean]
    writer.writerow(output)

def data_generate():
    datapath = './AICUP/train_data'
    tar_dir = './AICUP/tabular_data_train'
    pathlist_txt = Path(datapath).glob('**/*.txt')
    os.makedirs(tar_dir, exist_ok=True)
    
    for file in pathlist_txt:
        f = open(file)

        All_data = []

        count = 0
        for line in f.readlines():
            if line == '\n' or count == 0:
                count += 1
                continue
            num = line.split(' ')
            if len(num) > 5:
                tmp_list = []
                for i in range(6):
                    tmp_list.append(int(num[i]))
                All_data.append(tmp_list)
        
        f.close()

        swing_index = np.linspace(0, len(All_data), 28, dtype = int)
        # filename.append(int(Path(file).stem))
        # all_swing.append([swing_index])

        headerList = ['ax_mean', 'ay_mean', 'az_mean', 'gx_mean', 'gy_mean', 'gz_mean', 'ax_var', 'ay_var', 'az_var', 'gx_var', 'gy_var', 'gz_var', 'ax_rms', 'ay_rms', 'az_rms', 'gx_rms', 'gy_rms', 'gz_rms', 'a_max', 'a_mean', 'a_min', 'g_max', 'g_mean', 'g_min', 'a_fft', 'g_fft', 'a_psd', 'g_psd', 'a_kurt', 'g_kurt', 'a_skewn', 'g_skewn', 'a_entropy', 'g_entropy']                
        

        with open('./{dir}/{fname}.csv'.format(dir = tar_dir, fname = Path(file).stem), 'w', newline = '') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(headerList)
            try:
                a_fft, g_fft = FFT_data(All_data, swing_index)
                a_fft_imag = [0] * len(a_fft)
                g_fft_imag = [0] * len(g_fft)
                n_fft, a_fft, a_fft_imag = FFT(a_fft, a_fft_imag)
                n_fft, g_fft, g_fft_imag = FFT(g_fft, g_fft_imag)
                for i in range(len(swing_index)):
                    if i==0:
                        continue
                    feature(All_data[swing_index[i-1]: swing_index[i]], i - 1, len(swing_index) - 1, n_fft, a_fft, g_fft, a_fft_imag, g_fft_imag, writer)
            except:
                print(Path(file).stem)
                continue
data_generate()


In [7]:
import pandas as pd
import numpy as np
from pathlib import Path
from sklearn.preprocessing import MinMaxScaler
import joblib

# === 設定路徑 ===
TABULAR_DIR = Path("./AICUP/tabular_data_train")
SCALER_PATH = "./AICUP/tabular_scaler.pkl"

# === 收集所有 tabular 特徵 ===
tabular_list = []
for file in sorted(TABULAR_DIR.glob("*.csv")):
    df = pd.read_csv(file)
    tabular_list.append(df.values)  # shape: (27, 34)

X_all = np.vstack(tabular_list)  # shape: (N, 34)

# === 建立與儲存 MinMaxScaler ===
scaler = MinMaxScaler()
scaler.fit(X_all)

joblib.dump(scaler, SCALER_PATH)
print(f"✅ Tabular MinMaxScaler 已儲存至: {SCALER_PATH}")


✅ Tabular MinMaxScaler 已儲存至: ./AICUP/tabular_scaler.pkl


In [39]:
import numpy as np
import pandas as pd
from pathlib import Path
from tqdm import tqdm
from sklearn.model_selection import GroupShuffleSplit
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import roc_auc_score
from tensorflow.keras.utils import Sequence, to_categorical
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Input, Conv1D, MaxPooling1D, BatchNormalization, Dropout, Bidirectional, LSTM, GlobalAveragePooling1D, Dense, concatenate
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.metrics import AUC
import tensorflow as tf
import joblib
from tensorflow.keras.layers import LayerNormalization
from tensorflow.keras.layers import MultiHeadAttention


# === 參數設定 ===
SEQ_DIR = Path('./AICUP/sequence_data_train')
TAB_DIR = Path('./AICUP/tabular_data_train')
INFO_CSV = './AICUP/train_info.csv'
WINDOW = 40
STRIDE = 10
BATCH_SIZE = 32

# === 讀取訓練資訊 ===
info = pd.read_csv(INFO_CSV).set_index("unique_id")

# === 產生 sliding window 切片清單 ===
samples = []
for file in sorted(SEQ_DIR.glob("*.npy")):
    uid, seg_id = file.stem.split("_")
    uid, seg_id = int(uid), int(seg_id)
    if uid not in info.index: continue
    samples.append({'uid': uid, 'seg_id': seg_id, 'seq_path': file, 'tab_path': TAB_DIR / f"{uid}.csv"})

samples_df = pd.DataFrame(samples)

# === 分群切 train/val ===
gss = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
train_idx, val_idx = next(gss.split(samples_df, groups=samples_df['uid']))
train_samples = samples_df.iloc[train_idx].reset_index(drop=True)
val_samples = samples_df.iloc[val_idx].reset_index(drop=True)

# === tabular scaler ===
tab_all = []
for s in samples:
    tab = pd.read_csv(s['tab_path']).values
    tab_all.append(tab)
tab_all = np.vstack(tab_all)
scaler = StandardScaler().fit(tab_all)
joblib.dump(scaler, './tabular_scaler.pkl')

# === label encoding ===
label_encoders = {}
for col in ['gender', 'hold racket handed', 'play years', 'level']:
    le = LabelEncoder()
    info[col] = le.fit_transform(info[col])
    label_encoders[col] = le
joblib.dump(label_encoders, './label_encoders.pkl')

# === 資料產生器 ===
class DualInputGenerator(Sequence):
    def __init__(self, df, batch_size, scaler, info_df):
        self.df = df
        self.batch_size = batch_size
        self.scaler = scaler
        self.info_df = info_df

    def __len__(self):
        return int(np.ceil(len(self.df) / self.batch_size))

    def __getitem__(self, idx):
        batch = self.df.iloc[idx * self.batch_size:(idx + 1) * self.batch_size]
        X_seq, X_tab, y_gender, y_handed, y_years, y_level = [], [], [], [], [], []

        for _, row in batch.iterrows():
            seq = np.load(row['seq_path'])
            if seq.shape[0] < WINDOW:
                pad = np.zeros((WINDOW - seq.shape[0], 6))
                seq = np.vstack([seq, pad])
            elif seq.shape[0] > WINDOW:
                seq = seq[:WINDOW]

            tab = pd.read_csv(row['tab_path']).iloc[row['seg_id']].values.astype(np.float32)
            tab = self.scaler.transform([tab])[0]

            label_row = self.info_df.loc[row['uid']]
            yg, yh = label_row['gender'], label_row['hold racket handed']
            yy, yl = label_row['play years'], label_row['level']

            X_seq.append(seq)
            X_tab.append(tab)
            y_gender.append(yg)
            y_handed.append(yh)
            y_years.append(yy)
            y_level.append(yl)

        return (
            [np.array(X_seq), np.array(X_tab)],
            {
                'gender': np.array(y_gender).astype(np.float32),
                'handed': np.array(y_handed).astype(np.float32),
                'years': to_categorical(y_years, num_classes=3),
                'level': to_categorical(y_level, num_classes=4),
            }
        )

train_gen = DualInputGenerator(train_samples, BATCH_SIZE, scaler, info)
val_gen = DualInputGenerator(val_samples, BATCH_SIZE, scaler, info)

# === 建立模型 ===
seq_input = Input(shape=(40, 6))
x = Conv1D(64, 3, activation='relu', padding='same')(seq_input)
x = BatchNormalization()(x)
x = Conv1D(128, 5, activation='relu', padding='same')(x)
x = BatchNormalization()(x)
x = MaxPooling1D(2)(x)
x = Dropout(0.4)(x)
x = Bidirectional(LSTM(128, return_sequences=True))(x)
x = LayerNormalization()(x)
attn2 = MultiHeadAttention(num_heads=4, key_dim=32)(x, x)
x = LayerNormalization()(x + attn2)
x = GlobalAveragePooling1D()(x)

# 強化 Tabular 分支
tab_input = Input(shape=(34,))
t = Dense(256, activation='relu')(tab_input)
t = BatchNormalization()(t)
t = Dropout(0.4)(t)
t = Dense(128, activation='relu')(t)
t = BatchNormalization()(t)
t = Dropout(0.4)(t)
t = Dense(64, activation='relu')(t)
t = Dropout(0.4)(t)

merged = concatenate([x, t])
merged = Dense(128, activation='relu')(merged)
merged = Dropout(0.4)(merged)

output_gender = Dense(1, activation='sigmoid', name='gender')(merged)
output_handed = Dense(1, activation='sigmoid', name='handed')(merged)
output_years = Dense(3, activation='softmax', name='years')(merged)
output_level = Dense(4, activation='softmax', name='level')(merged)

model = Model(inputs=[seq_input, tab_input], outputs=[output_gender, output_handed, output_years, output_level])

model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
    loss={
        'gender': 'binary_crossentropy',
        'handed': 'binary_crossentropy',
        'years': 'categorical_crossentropy',
        'level': 'categorical_crossentropy'
    },
    metrics={
        'gender': AUC(name='auc'),
        'handed': AUC(name='auc'),
        'years': AUC(name='auc'),
        'level': AUC(name='auc')
    }
)

model.summary()

# === 訓練 ===
model.fit(
    train_gen,
    validation_data=val_gen,
    epochs=50,
    callbacks=[
        EarlyStopping(monitor='val_level_auc', patience=7, mode='max', restore_best_weights=True),
        ReduceLROnPlateau(monitor='val_level_auc', factor=0.5, patience=3, verbose=1, mode='max')
    ]
)


Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_4 (InputLayer)        [(None, 40, 6)]              0         []                            
                                                                                                  
 conv1d_4 (Conv1D)           (None, 40, 64)               1216      ['input_4[0][0]']             
                                                                                                  
 batch_normalization_6 (Bat  (None, 40, 64)               256       ['conv1d_4[0][0]']            
 chNormalization)                                                                                 
                                                                                                  
 conv1d_5 (Conv1D)           (None, 40, 128)              41088     ['batch_normalization_6[

<keras.src.callbacks.History at 0x2ade0c8be80>

In [40]:
# 儲存模型（建議格式）
model.save("./AICUP/model_fusion.keras")
print("✅ 模型已儲存至 ./AICUP/model_fusion.h5")


✅ 模型已儲存至 ./AICUP/model_fusion.h5


In [45]:
import numpy as np
import pandas as pd
from pathlib import Path
from tensorflow.keras.models import load_model
import joblib
from collections import defaultdict
from tqdm import tqdm

# === 路徑設定 ===
MODEL_PATH = "./AICUP/model_fusion.keras"
SCALER_PATH = "./tabular_scaler.pkl"
SEQ_DIR = Path("./AICUP/sequence_data_test")
TAB_DIR = Path("./AICUP/tabular_data_test")
SUBMIT_PATH = "./AICUP/sample_submission.csv"
WINDOW = 40

# === 載入模型與 scaler ===
model = load_model(MODEL_PATH, compile=False)
scaler = joblib.load(SCALER_PATH)
# === 補空白的壞掉檔案 ===
# for f in invalid_files:
#     np.save(f, np.zeros((85, 6), dtype=np.float32))
# === 整理每位選手所有段落 ===
uid_dict = defaultdict(list)
for file in sorted(SEQ_DIR.glob("*.npy")):
    uid_str = "_".join(file.stem.split("_")[:-1])  # e.g. '1968.0'
    uid = int(float(uid_str))  # 先 float 再 int
    uid_dict[uid].append(file)

submit_rows = []

for uid in tqdm(sorted(uid_dict.keys()), desc="🧠 推論中"):
    segments_seq, segments_tab = [], []

    for file in sorted(uid_dict[uid]):
        seg_id = int(file.stem.split("_")[-1])
        seq = np.load(file, allow_pickle=True)
        if seq.shape[0] < WINDOW:
            pad = np.zeros((WINDOW - seq.shape[0], 6))
            seq = np.vstack([seq, pad])
        elif seq.shape[0] > WINDOW:
            seq = seq[:WINDOW]
        segments_seq.append(seq)

        tab_path = TAB_DIR / f"{uid}.csv"
        if not tab_path.exists():
            print(f"⚠️ 找不到 tabular 資料: {tab_path}")
            continue
        tab_df = pd.read_csv(tab_path)
        if seg_id >= len(tab_df):
            print(f"⚠️ 跳過 {uid}_{seg_id}，tabular 無對應資料")
            continue
        tab = tab_df.iloc[seg_id].values.astype(np.float32)
        tab = scaler.transform([tab])[0]
        segments_tab.append(tab)

    if not segments_seq or len(segments_seq) != len(segments_tab):
        print(f"⚠️ UID {uid} 的段數不一致，跳過")
        continue

    X_seq = np.array(segments_seq)
    X_tab = np.array(segments_tab)

    preds = model.predict([X_seq, X_tab], verbose=0)
    if not isinstance(preds, list):
        preds = [preds]

    weighted_preds = []
    for i, p in enumerate(preds):
        if p.shape[1] == 1:
            # sigmoid：先 squeeze，再反轉 → 預測 label=1 的機率（男/右手）
            p = p.squeeze(axis=1)
            p = 1.0 - p  # 👈 反轉：原本是 label=0（女/左手）的機率
            weights = p
            weighted_avg = np.average(p, weights=weights)
            weighted_preds.append(weighted_avg)
        else:
            # softmax
            weights = np.max(p, axis=1)
            weighted_avg = np.average(p, axis=0, weights=weights)
            weighted_preds.append(weighted_avg)

    row = [
        uid,
        np.float32(weighted_preds[0]),         # gender → 男生機率
        np.float32(weighted_preds[1]),         # handed → 右手機率
        *map(np.float32, weighted_preds[2]),   # play years softmax: 3 類
        *map(np.float32, weighted_preds[3])    # level softmax: 4 類
    ]
    submit_rows.append(row)

# === 輸出 CSV ===
columns = [
    "unique_id", "gender", "hold racket handed",
    "play years_0", "play years_1", "play years_2",
    "level_2", "level_3", "level_4", "level_5"
]
df_submit = pd.DataFrame(submit_rows, columns=columns)
df_submit = df_submit.sort_values("unique_id")
float_cols = df_submit.columns.difference(["unique_id"])
df_submit[float_cols] = df_submit[float_cols].astype(np.float32)
df_submit.to_csv(SUBMIT_PATH, index=False, float_format="%.4f")

print(f"\n✅ 已儲存預測結果至: {SUBMIT_PATH}")


🧠 推論中: 100%|██████████| 5/5 [00:06<00:00,  1.31s/it]


✅ 已儲存預測結果至: ./AICUP/sample_submission.csv





In [46]:
from pathlib import Path
import numpy as np

invalid_files = []

for file in sorted(Path("AICUP/sequence_data_test").glob("*.npy")):
    try:
        data = np.load(file, allow_pickle=False)
    except Exception as e:
        print(f"❌ 無法讀取：{file.name}，原因：{e}")
        invalid_files.append(file)

print(f"\n共找到 {len(invalid_files)} 個無法讀取的檔案")



共找到 0 個無法讀取的檔案


In [48]:
import pandas as pd
import numpy as np
from pathlib import Path

EXPECTED_COLUMNS = [
    "unique_id", "gender", "hold racket handed",
    "play years_0", "play years_1", "play years_2",
    "level_2", "level_3", "level_4", "level_5"
]
csv_path = "./AICUP/sample_submission.csv"
def check_submission_format(csv_path):
    file_path = Path(csv_path)
    if not file_path.exists():
        print(f"❌ 找不到檔案：{csv_path}")
        return

    print(f"✅ 找到檔案：{csv_path}")
    df = pd.read_csv(file_path)

    # === 檢查欄位名稱 ===
    if list(df.columns) != EXPECTED_COLUMNS:
        print("❌ 欄位名稱不正確！應為：")
        print(EXPECTED_COLUMNS)
        print("實際欄位：")
        print(list(df.columns))
        return
    print("✅ 欄位名稱正確")

    # === 檢查欄位數量 ===
    if df.shape[1] != 10:
        print(f"❌ 欄位數量錯誤，應為 10 欄，實際為 {df.shape[1]}")
        return
    print("✅ 欄位數量正確")

    # === 檢查欄位型別 ===
    non_numeric = df.drop(columns=["unique_id"]).select_dtypes(exclude=[np.number]).columns.tolist()
    if non_numeric:
        print(f"❌ 以下欄位不是數值型：{non_numeric}")
        return
    print("✅ 所有機率欄位皆為數值型")

    # === 檢查機率範圍是否在 0～1 ===
    probs = df.drop(columns=["unique_id"])
    if ((probs < 0) | (probs > 1)).any().any():
        print("❌ 有機率值超出 0~1 範圍")
        rows = probs[(probs < 0) | (probs > 1)].dropna(how='all')
        print("錯誤樣本：")
        print(rows.head())
        return
    print("✅ 所有預測機率都在 0 ~ 1 範圍內")

    print("🎉 CSV 格式檢查完成，一切正常！")

# === 執行檢查 ===
check_submission_format("./AICUP/sample_submission.csv")


✅ 找到檔案：./AICUP/sample_submission.csv
✅ 欄位名稱正確
✅ 欄位數量正確
✅ 所有機率欄位皆為數值型
✅ 所有預測機率都在 0 ~ 1 範圍內
🎉 CSV 格式檢查完成，一切正常！


In [None]:
from pathlib import Path
import numpy as np
import pandas as pd
import math
import csv
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import MinMaxScaler, LabelEncoder
from sklearn.metrics import roc_auc_score

def FFT(xreal, ximag):    
    n = 2
    while(n*2 <= len(xreal)):
        n *= 2
    
    p = int(math.log(n, 2))
    
    for i in range(0, n):
        a = i
        b = 0
        for j in range(0, p):
            b = int(b*2 + a%2)
            a = a/2
        if(b > i):
            xreal[i], xreal[b] = xreal[b], xreal[i]
            ximag[i], ximag[b] = ximag[b], ximag[i]
            
    wreal = []
    wimag = []
        
    arg = float(-2 * math.pi / n)
    treal = float(math.cos(arg))
    timag = float(math.sin(arg))
    
    wreal.append(float(1.0))
    wimag.append(float(0.0))
    
    for j in range(1, int(n/2)):
        wreal.append(wreal[-1] * treal - wimag[-1] * timag)
        wimag.append(wreal[-1] * timag + wimag[-1] * treal)
        
    m = 2
    while(m < n + 1):
        for k in range(0, n, m):
            for j in range(0, int(m/2), 1):
                index1 = k + j
                index2 = int(index1 + m / 2)
                t = int(n * j / m)
                treal = wreal[t] * xreal[index2] - wimag[t] * ximag[index2]
                timag = wreal[t] * ximag[index2] + wimag[t] * xreal[index2]
                ureal = xreal[index1]
                uimag = ximag[index1]
                xreal[index1] = ureal + treal
                ximag[index1] = uimag + timag
                xreal[index2] = ureal - treal
                ximag[index2] = uimag - timag
        m *= 2
        
    return n, xreal, ximag   
    
def FFT_data(input_data, swinging_times):   
    txtlength = swinging_times[-1] - swinging_times[0]
    a_mean = [0] * txtlength
    g_mean = [0] * txtlength
       
    for num in range(len(swinging_times)-1):
        a = []
        g = []
        for swing in range(swinging_times[num], swinging_times[num+1]):
            a.append(math.sqrt(math.pow((input_data[swing][0] + input_data[swing][1] + input_data[swing][2]), 2)))
            g.append(math.sqrt(math.pow((input_data[swing][3] + input_data[swing][4] + input_data[swing][5]), 2)))

        a_mean[num] = (sum(a) / len(a))
        g_mean[num] = (sum(a) / len(a))
    
    return a_mean, g_mean

def feature(input_data, swinging_now, swinging_times, n_fft, a_fft, g_fft, a_fft_imag, g_fft_imag, writer):
    allsum = []
    mean = []
    var = []
    rms = []
    XYZmean_a = 0
    a = []
    g = []
    a_s1 = 0
    a_s2 = 0
    g_s1 = 0
    g_s2 = 0
    a_k1 = 0
    a_k2 = 0
    g_k1 = 0
    g_k2 = 0
    
    for i in range(len(input_data)):
        if i==0:
            allsum = input_data[i]
            a.append(math.sqrt(math.pow((input_data[i][0] + input_data[i][1] + input_data[i][2]), 2)))
            g.append(math.sqrt(math.pow((input_data[i][3] + input_data[i][4] + input_data[i][5]), 2)))
            continue
        
        a.append(math.sqrt(math.pow((input_data[i][0] + input_data[i][1] + input_data[i][2]), 2)))
        g.append(math.sqrt(math.pow((input_data[i][3] + input_data[i][4] + input_data[i][5]), 2)))
       
        allsum = [allsum[feature_index] + input_data[i][feature_index] for feature_index in range(len(input_data[i]))]
        
    mean = [allsum[feature_index] / len(input_data) for feature_index in range(len(input_data[i]))]
    
    for i in range(len(input_data)):
        if i==0:
            var = input_data[i]
            rms = input_data[i]
            continue

        var = [var[feature_index] + math.pow((input_data[i][feature_index] - mean[feature_index]), 2) for feature_index in range(len(input_data[i]))]
        rms = [rms[feature_index] + math.pow(input_data[i][feature_index], 2) for feature_index in range(len(input_data[i]))]
        
    var = [math.sqrt((var[feature_index] / len(input_data))) for feature_index in range(len(input_data[i]))]
    rms = [math.sqrt((rms[feature_index] / len(input_data))) for feature_index in range(len(input_data[i]))]
    
    a_max = [max(a)]
    a_min = [min(a)]
    a_mean = [sum(a) / len(a)]
    g_max = [max(g)]
    g_min = [min(g)]
    g_mean = [sum(g) / len(g)]
    
    a_var = math.sqrt(math.pow((var[0] + var[1] + var[2]), 2))
    
    for i in range(len(input_data)):
        a_s1 = a_s1 + math.pow((a[i] - a_mean[0]), 4)
        a_s2 = a_s2 + math.pow((a[i] - a_mean[0]), 2)
        g_s1 = g_s1 + math.pow((g[i] - g_mean[0]), 4)
        g_s2 = g_s2 + math.pow((g[i] - g_mean[0]), 2)
        a_k1 = a_k1 + math.pow((a[i] - a_mean[0]), 3)
        g_k1 = g_k1 + math.pow((g[i] - g_mean[0]), 3)
    
    a_s1 = a_s1 / len(input_data)
    a_s2 = a_s2 / len(input_data)
    g_s1 = g_s1 / len(input_data)
    g_s2 = g_s2 / len(input_data)
    a_k2 = math.pow(a_s2, 1.5)
    g_k2 = math.pow(g_s2, 1.5)
    a_s2 = a_s2 * a_s2
    g_s2 = g_s2 * g_s2
    
    a_kurtosis = [a_s1 / a_s2]
    g_kurtosis = [g_s1 / g_s2]
    a_skewness = [a_k1 / a_k2]
    g_skewness = [g_k1 / g_k2]
    
    a_fft_mean = 0
    g_fft_mean = 0
    cut = int(n_fft / swinging_times)
    a_psd = []
    g_psd = []
    entropy_a = []
    entropy_g = []
    e1 = []
    e3 = []
    e2 = 0
    e4 = 0
    
    for i in range(cut * swinging_now, cut * (swinging_now + 1)):
        a_fft_mean += a_fft[i]
        g_fft_mean += g_fft[i]
        a_psd.append(math.pow(a_fft[i], 2) + math.pow(a_fft_imag[i], 2))
        g_psd.append(math.pow(g_fft[i], 2) + math.pow(g_fft_imag[i], 2))
        e1.append(math.pow(a_psd[-1], 0.5))
        e3.append(math.pow(g_psd[-1], 0.5))
        
    a_fft_mean = a_fft_mean / cut
    g_fft_mean = g_fft_mean / cut
    
    a_psd_mean = sum(a_psd) / len(a_psd)
    g_psd_mean = sum(g_psd) / len(g_psd)
    
    for i in range(cut):
        e2 += math.pow(a_psd[i], 0.5)
        e4 += math.pow(g_psd[i], 0.5)
    
    for i in range(cut):
        entropy_a.append((e1[i] / e2) * math.log(e1[i] / e2))
        entropy_g.append((e3[i] / e4) * math.log(e3[i] / e4))
    
    a_entropy_mean = sum(entropy_a) / len(entropy_a)
    g_entropy_mean = sum(entropy_g) / len(entropy_g)       
        
    
    output = mean + var + rms + a_max + a_mean + a_min + g_max + g_mean + g_min + [a_fft_mean] + [g_fft_mean] + [a_psd_mean] + [g_psd_mean] + a_kurtosis + g_kurtosis + a_skewness + g_skewness + [a_entropy_mean] + [g_entropy_mean]
    writer.writerow(output)
# === 手動指定 UID ===
uid = "3211"
txt_path = Path(f"./AICUP/test_data/{uid}.txt")
csv_path = Path(f"./AICUP/tabular_data_test/{uid}.csv")

# === 讀入資料 ===
with open(txt_path) as f:
    lines = f.read().splitlines()
    data = [list(map(int, l.strip().split())) for l in lines if len(l.strip().split()) == 6]

swing_index = np.linspace(0, len(data), 28, dtype=int)
headerList = ['ax_mean', 'ay_mean', 'az_mean', 'gx_mean', 'gy_mean', 'gz_mean', 'ax_var', 'ay_var', 'az_var', 'gx_var', 'gy_var', 'gz_var', 'ax_rms', 'ay_rms', 'az_rms', 'gx_rms', 'gy_rms', 'gz_rms', 'a_max', 'a_mean', 'a_min', 'g_max', 'g_mean', 'g_min', 'a_fft', 'g_fft', 'a_psd', 'g_psd', 'a_kurt', 'g_kurt', 'a_skewn', 'g_skewn', 'a_entropy', 'g_entropy']

# === FFT 處理 ===
a_fft, g_fft = FFT_data(data, swing_index)
a_fft_imag = [0] * len(a_fft)
g_fft_imag = [0] * len(g_fft)
n_fft, a_fft, a_fft_imag = FFT(a_fft, a_fft_imag)
n_fft, g_fft, g_fft_imag = FFT(g_fft, g_fft_imag)

# === 寫入單一檔案 ===
with open(csv_path, 'w', newline='') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(headerList)
    for i in range(1, len(swing_index)):
        try:
            seg = data[swing_index[i-1]:swing_index[i]]
            if len(seg) == 0:
                print(f"⚠️ 空段：{uid}_{i-1}")
                continue
            feature(seg, i - 1, len(swing_index) - 1, n_fft, a_fft, g_fft, a_fft_imag, g_fft_imag, writer)
        except Exception as e:
            print(f"❌ 發生錯誤於 {uid}_{i-1}：{e}")
