In [1]:
%load_ext autoreload
%autoreload 2
from Users.project.predict_lab_time_module.predict_lap_time import PredictLapTime
from Users.project.data_container.data_container import AzureStorageAccess, FolderAccess
from Users.project.train_process import model_predictor
from Users.project.train_process.data_state import DataFeaturing, IDatasetCreator
from Users.project.train_process.file_name_storage import FileNameStorage
from Users.project.train_process.loss_func import ILossFunc, MSELoss
from Users.project.train_process.model import ImprovedNN, Model
from Users.project.train_process.model_loader import ModelLoader
from Users.project.train_process.model_predictor import IModelPredictor, MyModelPredictor
from Users.project.train_process.model_storage import ModelStorage
from Users.project.train_process.model_trainer import IModelTrainer, MyModelTrainer
from Users.project.train_process.my_utils import extract_track_from_path, get_real_track_name
from Users.project.train_process.optimizer import AdamOptimizer
from Users.project.train_process.process_unit import ProcessUnit
from Users.project.predict_lab_time_module.create_lap_time_dataset import LapTimePredictDatasetCreator, EFeatureType
import torch
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

data_access = AzureStorageAccess()
featuring = DataFeaturing()
dataset_creator = LapTimePredictDatasetCreator()
game_path = "./Users/project/correct_file/"
log_path = "./Users/project/log/"

cpu


In [1]:
# %load_ext autoreload
# %autoreload 2
# weather_data = None
# car_data = None
# laps_data = None

# car_file_name = None
# laps_file_name = None
# for file in data_access.get_all_file():
#     file_name = file.name

#     tokens = file_name.split("/")
#     current_base_game = tokens[1]

#     if len(tokens) < 4:
#         continue

#     file_format = tokens[3]
#     if file_format == "car_data_all.csv":
#         cs = file_name
#         car_file_name = f"{tokens[1]}/{tokens[2]}"
#         car_data = data_access.read_csv_from_blob(file_name)
#         check_value = 1
#     elif file_format == "laps.csv" and check_value == 1:
#         ls = file_name
#         laps_file_name = f"{tokens[1]}/{tokens[2]}"
#         laps_data = data_access.read_csv_from_blob(file_name)

#         if car_file_name != laps_file_name:
#             print("Warning: ", car_file_name, laps_file_name)
#         ws = f"{tokens[0]}/{tokens[1]}/weather_data.csv"
#         weather_data = data_access.read_csv_by_data_frame(ws)
#         if 5 < len(car_data) and 2 < len(laps_data) and (car_data.isnull().values.any() == False) and (weather_data.isnull().values.any() == False):
#             game_name = get_real_track_name(extract_track_from_path(file_name))
#             print(game_name)
#             with open(game_path + game_name, "a", encoding="utf-8") as file:
#                 file.write(f"{tokens[0]}/{tokens[1]}/{tokens[2]}/\n")

#         check_value = 0
#     print(check_value)
    

In [2]:
def get_weather_data(current_time, data_frame: pd.DataFrame, debug_log: str):
    last_data = data_frame[data_frame["Time"] <= current_time].tail(1)
    return last_data


def get_car_data_feature(data_frame: pd.DataFrame) -> list[float]:
    # 데이터 추출
    rpm_datas = data_frame["RPM"]
    speed_datas = data_frame["Speed"]
    gear_datas = data_frame["nGear"]
    throttle_datas = data_frame["Throttle"]
    brake_datas = data_frame["Brake"]
    drs_datas = data_frame["DRS"]

    # 피처 리스트 초기화
    features = []

    # 1. RPM 피처
    features.append(rpm_datas.mean())  # 평균 RPM
    features.append(rpm_datas.max())   # 최대 RPM
    features.append(rpm_datas.diff().abs().mean())  
    features.append(rpm_datas.std())   # RPM 표준편차 (변동성)

    # 2. Speed 피처
    features.append(speed_datas.mean())  # 평균 속도
    features.append(speed_datas.max())   # 최대 속도
    features.append(speed_datas.diff().abs().mean())  # 평균 속도 변화율 (가속도)
    features.append(speed_datas.std())   # 속도 표준편차

    # 3. nGear 피처
    features.append(gear_datas.mean()) 
    features.append(gear_datas.max())  
    features.append(gear_datas.diff().abs().mean())  # 평균 기어 변화율
    features.append(gear_datas.std()) 
    features.append(gear_datas.value_counts().max())  # 가장 많이 사용된 기어의 빈도
    features.append((gear_datas.shift() != gear_datas)[1:].sum())  # 기어 전환 횟수

    # 4. Throttle 피처
    features.append(throttle_datas.mean())  # 평균 스로틀 사용
    features.append(throttle_datas.max())   
    features.append(throttle_datas.diff().abs().mean()) 
    features.append(throttle_datas.std()) 
    features.append((throttle_datas > 0.95).sum() / len(throttle_datas)) # 거의 풀 스로틀의 비율

    # 5. Brake 피처
    features.append(brake_datas.mean())  # 브레이크 사용 비율

    # 6. DRS 피처
    features.append(drs_datas.mean())  # DRS 평균
    features.append(drs_datas.max())  # DRS 최대
    features.append(drs_datas.diff().abs().mean())  # 평균 drs 변화율
    features.append(drs_datas.std())  
    features.append(drs_datas.value_counts().max())  # 가장 많이 사용된 drs 빈도
    features.append((drs_datas.shift() != drs_datas)[1:].sum())  # drs 전환 횟수

#       코스팅(Coasting) 시간 비율: 스로틀과 브레이크를 모두 사용하지 않는 타력 주행 구간의 비율입니다. 드라이버의 효율성을 나타내는 지표가 될 수 있습니다.
#       # Throttle과 Brake가 모두 5% 미만인 시간의 비율
#       ((throttle_datas < 0.05) & (brake_datas < 0.05)).sum() / len(data_frame)
    return features

def get_laps_data_feature(row: pd.DataFrame) -> list[float]:
    sector1_time = pd.to_timedelta(row.Sector1Time).total_seconds()
    sector2_time = pd.to_timedelta(row.Sector2Time).total_seconds()
    sector3_time = pd.to_timedelta(row.Sector3Time).total_seconds()
    compound = row.Compound
    fresh_tyre = row.FreshTyre
    track_status = int(row.TrackStatus)

    features = []

    # 타이어 상태 원핫인코딩
    tyre_mapping = {"SOFT": [1, 0, 0], "MEDIUM": [0, 1, 0], "HARD": [0, 0, 1]}
    compound_encoded = tyre_mapping.get(compound, [1, 0, 0])  # 알 수 없는 타이어는 [0,0,0]

    track_flags = []
    flag = 16  # 2^4부터 시작 (5비트)
    while flag != 0:
        track_flags.append(1.0 if track_status & flag else 0.0)
        flag //= 2


    features.append(fresh_tyre)
    features.append(sector1_time)
    features.append(sector2_time)
    features.append(sector3_time)
    features += compound_encoded
    features += track_flags

    return features

def get_weather_data_feature(data_frame: pd.DataFrame) -> list[float]:
    air_temp = data_frame["AirTemp"].item()
    humidity = data_frame["Humidity"].item()
    pressure = data_frame["Pressure"].item()
    rainfall = data_frame["Rainfall"].item()
    track_temp = data_frame["TrackTemp"].item()
    wind_direction = data_frame["WindDirection"].item()
    wind_speed = data_frame["WindSpeed"].item()
    features = [air_temp, humidity, pressure, rainfall, track_temp, wind_direction, wind_speed]
    return features
    
def analysis_one_team(car_data: pd.DataFrame, laps_data: pd.DataFrame, weather_data: pd.DataFrame, debug_log: str):
    # Time 컬럼을 timedelta로 변환
    car_data["Time"] = pd.to_timedelta(car_data["Time"])
    laps_data["LapTime"] = pd.to_timedelta(laps_data["LapTime"])
    weather_data["Time"] = pd.to_timedelta(weather_data["Time"])
    progress_time = pd.Timedelta(0)

    one_team_features = []
    one_team_label = []

    for row in laps_data.itertuples():
        lap_number = row.LapNumber
        is_accurate = row.IsAccurate

        if is_accurate == False:
            continue

        lap_time = row.LapTime
        progress_time += lap_time

        same_lap_number_data = car_data[car_data["LapNumber"] == lap_number]

        if same_lap_number_data.empty or len(same_lap_number_data) < 2:
            continue

        weather_frame = get_weather_data(progress_time, weather_data, debug_log)

        if weather_frame.empty:
            continue

        one_lap_features = []
        car_data_feature = get_car_data_feature(same_lap_number_data)
        laps_data_feature = get_laps_data_feature(row)
        weather_data_feature = get_weather_data_feature(weather_frame)

        one_lap_features += car_data_feature
        one_lap_features += laps_data_feature
        one_lap_features += weather_data_feature
        one_lap_features = [float(x) if isinstance(x, (np.float32, np.float64)) else x for x in one_lap_features]
        one_lap_features = [int(x) if isinstance(x, (np.int32, np.int64)) else x for x in one_lap_features]
        one_team_features.append(one_lap_features)

        one_team_label.append([lap_time.total_seconds()])
    return one_team_features, one_team_label

feature_headers = [
    # 1. RPM
    "rpm_mean", "rpm_max", "rpm_diff_mean", "rpm_std",
    
    # 2. Speed
    "speed_mean", "speed_max", "speed_diff_mean", "speed_std",
    
    # 3. nGear
    "gear_mean", "gear_max", "gear_diff_mean", "gear_std",
    "gear_most_freq", "gear_change_count",
    
    # 4. Throttle
    "throttle_mean", "throttle_max", "throttle_diff_mean", "throttle_std",
    "throttle_full_ratio",
    
    # 5. Brake
    "brake_mean",
    
    # 6. DRS
    "drs_mean", "drs_max", "drs_diff_mean", "drs_std",
    "drs_most_freq", "drs_change_count",
    
    # Laps Feature
    "fresh_tyre",
    "sector1_time", "sector2_time", "sector3_time",
    "tyre_SOFT", "tyre_MEDIUM", "tyre_HARD",
    "flag_16", "flag_8", "flag_4", "flag_2", "flag_1",  # 5비트

    # Weather Feature
    "air_temp", "humidity", "pressure", "rainfall", "track_temp",
    "wind_direction", "wind_speed"
]

In [6]:
from torch import nn, optim
class MyNN(nn.Module):
    def __init__(self, in_dim: int):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(in_dim, 64),
            nn.ReLU(),

            nn.Linear(64, 16),
            nn.ReLU(),

            nn.Linear(16, 1)
        )
    def forward(self, x):
        return self.net(x)

In [4]:
import os
def train(track_name: str):
    with open(
        game_path + track_name, "r", encoding="utf-8"
    ) as track_game_team_list_file:
        one_game_features = []
        one_game_labels = []
        for track_game_team_str in track_game_team_list_file:
            game_team = track_game_team_str.strip()
            tokens = game_team.split("/")

            car_data = data_access.read_csv_by_data_frame(
                game_team + "car_data_all.csv"
            )
            laps_data = data_access.read_csv_by_data_frame(game_team + "laps.csv")
            weather_data = data_access.read_csv_by_data_frame(
                f"{tokens[0]}/{tokens[1]}/weather_data.csv"
            )
            one_team_features, one_team_label = analysis_one_team(
                car_data, laps_data, weather_data, game_team
            )

            one_game_features += one_team_features
            one_game_labels += one_team_label

        # 파이프라인 1: 데이터 피쳐링 후 파일로 저장
        path = os.path.join(log_path, track_name, "features.csv")
        df = pd.DataFrame(one_game_features)
        if df.isnull().values.any():
            print(f"{track_name} in nan")
            return False

        df.to_csv(path, index=False, header=feature_headers)

        # 파이프라인 2: 스케일링
        standard_scaler = StandardScaler()
        scaled_array = standard_scaler.fit_transform(df)  # numpy배열로 리턴
        df = pd.DataFrame(scaled_array)
        path = os.path.join(log_path, track_name, "scaling.csv")
        df.to_csv(path, index=False, header=feature_headers)

        # 각 트랙에 맞는 모델을 만들고 학습시키기
        one_game_labels = pd.DataFrame(one_game_labels)
        # --- 3. 학습/검증 데이터 분리 및 텐서 변환 ---
        # numpy 배열을 사용하여 데이터를 분리
        x_train, x_val, y_train, y_val = train_test_split(
            df, one_game_labels.values, test_size=0.2, random_state=42
        )

        # PyTorch 텐서로 변환
        x_train_tensor = torch.tensor(x_train.values, dtype=torch.float32)
        y_train_tensor = torch.tensor(y_train, dtype=torch.float32).view(-1, 1)
        x_val_tensor = torch.tensor(x_val.values, dtype=torch.float32)
        y_val_tensor = torch.tensor(y_val, dtype=torch.float32).view(-1, 1)

        neural_network = MyNN(df.shape[1])
        optimizer = optim.Adam(neural_network.parameters(), lr=0.01)
        loss_func = nn.MSELoss()
        num_epochs = 1000

        path = os.path.join(log_path, track_name, "loss.txt")
        f = open(path, 'w', encoding="utf-8")
        print(track_name)
        for epoch in range(num_epochs):
            # 학습 모드
            neural_network.train()

            # 순전파s
            outputs = neural_network(x_train_tensor)
            loss = loss_func(outputs, y_train_tensor)

            # 역전파 및 최적화
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # 검증 모드
            neural_network.eval()
            with torch.no_grad():
                val_outputs = neural_network(x_val_tensor)
                val_loss = loss_func(val_outputs, y_val_tensor)

            if (epoch + 1) % 10 == 0:
                f.write(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {loss.item():.4f}, Validation Loss: {val_loss.item():.4f}\n")
        f.close()

    return True

In [None]:
# for track_name in os.listdir(game_path):
#     train(track_name)

In [5]:
train("Qatar")

Qatar


True

In [None]:
s = set()

for file in data_access.get_all_file():
    if "laps.csv" not in file.name:
        continue
    file_name = file.name
    tokens = file_name.split("/")
    if len(tokens) < 4:
        continue
    print(file.name)
    df = data_access.read_csv_from_blob(file.name)

    # isaccurate == True인 행만 필터링
    df_filtered = df[df["IsAccurate"] == True]

    # NaN이 하나라도 있는 컬럼들만 추출
    nan_columns = df_filtered.columns[df_filtered.isnull().any()].tolist()

    # set에 추가
    s.update(nan_columns)

print("NaN이 포함된 컬럼들 (isaccurate=True 기준):", s)