In [1]:
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sktime.classification.distance_based import KNeighborsTimeSeriesClassifier
import polars as pl
import numpy as np
import os
import glob
import re

from data.track_preprocess import TrajectoryDataProcessor, process_df

In [2]:
DATA_ROOT = r'D:\DataSets\挑战杯_揭榜挂帅_CQ-08赛题_数据集'
NUM_CLASSES = 4
SEQ_LEN = 12

In [3]:
def get_data(data_root: str, num_classes: int, time: int):
    if time > SEQ_LEN:
        raise ValueError(f"time {time} should be less than or equal to SEQ_LEN {SEQ_LEN}")
    data = []
    labels = []

    point_files = glob.glob(os.path.join(data_root, '点迹/PointTracks_*.txt'))
    for point_file in point_files:
        match_result = re.match(r'PointTracks_(\d+)_(\d+)_(\d+).txt', os.path.basename(point_file))
        batch_id = int(match_result.group(1))
        label = int(match_result.group(2))
        num_tracks = int(match_result.group(3))
        if label > num_classes:
            continue
        track_file = os.path.join(data_root, f"航迹/Tracks_{batch_id}_{label}_{num_tracks}.txt")

        preprocessed_data = TrajectoryDataProcessor(point_file, track_file).get_processed_data()
        point_df = pl.from_pandas(preprocessed_data['point_data'])
        track_df = pl.from_pandas(preprocessed_data['track_data'])
        df = point_df.join(track_df, on=["时间", "批号"], how="left").sort("时间")
        df = process_df(df)
        data_batch = df.to_numpy(order='c').astype(np.float32)
        data_batch = data_batch[:time, :]
        data.append(data_batch)
        labels.append(label - 1)
    data = np.concatenate(data, axis=0)
    labels = np.array(labels)
    train_data, test_data, train_labels, test_labels = train_test_split(data, labels, test_size=0.2, random_state=42)
    # normalize
    train_scaler = StandardScaler()
    train_data = train_scaler.fit_transform(train_data)
    test_scaler = StandardScaler()
    test_data = test_scaler.fit_transform(test_data)
    return train_data, test_data, train_labels, test_labels, train_scaler, test_scaler

In [4]:
for t in range(1, SEQ_LEN + 1):
    train_data, test_data, train_labels, test_labels, train_scaler, test_scaler = get_data(DATA_ROOT, NUM_CLASSES, t)
    knn_tsc = KNeighborsTimeSeriesClassifier(n_neighbors=3)
    knn_tsc.fit(train_data, train_labels)
    pred_labels = knn_tsc.predict(test_data)
    acc = accuracy_score(test_labels, pred_labels)
    print(f"time {t}, acc: {acc}")

ColumnNotFoundError: 平均全速度