#### Parquet을 읽고 차량(VEHICLE_ID) 리스트, sensor 노드 매핑 만들기

In [2]:
import pandas as pd
from pathlib import Path
import torch

# (1) 모든 Parquet 파일을 하나로 읽어서 DataFrame으로 합치기
# 보통 날짜별/장소별 Parquet이 분리되어 있으므로, rglob으로 재귀 탐색
parquet_root = Path("data/processed_v2x")
all_parquets = sorted(parquet_root.rglob("*.parquet"))

# 리스트에 담았다가 한 번에 합칩니다. (메모리가 허용하는 선에서만 사용하세요.)
df_list = []
for p in all_parquets:
    df_tmp = pd.read_parquet(p)
    # Parquet 파일이라면 이미  
    # 'timestamp', 'VEHICLE_ID', 'LONGITUDE', 'LATITUDE', 'SPEED', 'ACC_SEC', 'BRAKE_STATUS', 'period', 'date', 'location'
    # 등이 포함되어 있을 것입니다.
    df_list.append(df_tmp)

df_all = pd.concat(df_list, ignore_index=True)

# (2) Vehicle_ID 목록 (고유 노드 개수) 구하기
vehicle_ids = df_all["VEHICLE_ID"].unique().tolist()
N = len(vehicle_ids)

# 각 VEHICLE_ID를 0..N-1 정수로 매핑
vid2idx = {vid: i for i, vid in enumerate(vehicle_ids)}

# (3) Node 별 “대표 위치”(위도, 경도) 구하기
# 예: 데이터프레임에서 가장 첫 번째 기록(혹은 평균 위치 등) 하나를 뽑아서 static 피처로 쓰면 된다.
rep = df_all.sort_values("timestamp").groupby("VEHICLE_ID").first().reset_index()
# rep DataFrame 에는 각 vehicle_id당 대표 레코드가 들어 있어요.
# 예를 들어 rep["LATITUDE"], rep["LONGITUDE"] 등을 static feature로 쓸 수 있습니다.


OSMnx로 그래프(도로망) 불러오기

In [5]:
import osmnx as ox
import networkx as nx

# (1) 센서 대표 위치의 평균 위경도를 계산해서 중심점으로 잡기
lat_center = rep["LATITUDE"].mean()
lon_center = rep["LONGITUDE"].mean()

# (2) 반경 1km짜리 도로망을 “drive” 타입으로 가져오기
#    → 이 G는 networkx 그래프 형태로, 노드마다 위경도(“y”, “x”) 속성을 가집니다.
G = ox.graph_from_point(
    (lat_center, lon_center),
    dist=1000,             # 미터 단위 반경(1km)
    network_type="drive"   # 자동차 주행용 도로만 추출
)

# (3) 각 센서(VEHICLE_ID)에 대해, 가장 가까운 도로망 상의 노드(노드 ID)를 찾기
#     이때 scikit-learn이 반드시 설치되어 있어야 합니다.
#     (예: pip install scikit-learn)
sensor_nodes = {}
for vid, row in rep.set_index("VEHICLE_ID").iterrows():
    lon, lat = row["LONGITUDE"], row["LATITUDE"]
    # osmnx.distance.nearest_nodes는 “G”, “X경도”, “Y위도” 순서로 인자 사용
    nearest_node = ox.distance.nearest_nodes(G, lon, lat)
    sensor_nodes[vid] = nearest_node

# (4) 센서 노드 ID(도로망 노드) → 0..N−1 인덱스로 변환
sensor_nodes_idx = {vid: vid2idx[vid] for vid in vehicle_ids}

#  예를 들어, 
#     센서 vid = 34AF2943 → 실제 도로망 노드 e.g. 345678901 노드 ID → 이 센서가 vid2idx[34AF2943] = 0번 인덱스로 매핑
#     sensor_nodes : { "34AF2943": 345678901, ... }


#### sensor_nodes에는 VEHICLE_ID → 그래프 G의 노드 ID가 정의되어 있습니다.
- 이 정보를 바탕으로 “노드 간 인접 리스트(edge_index)”를 만들 수 있습니다.

In [8]:
import torch

# vid2idx: VEHICLE_ID → 0..N−1
# sensor_nodes: VEHICLE_ID → 실제 도로망 G 노드 ID
# G: networkx 그래프

edges_src = []
edges_dst = []
for vid_i, node_i in sensor_nodes.items():
    idx_i = vid2idx[vid_i]
    for vid_j, node_j in sensor_nodes.items():
        idx_j = vid2idx[vid_j]
        if idx_i == idx_j:
            continue
        try:
            dist_m = nx.shortest_path_length(G, node_i, node_j, weight="length")
            if dist_m < 200:  # 200m 이내면 인접으로 간주
                edges_src.append(idx_i)
                edges_dst.append(idx_j)
        except nx.NetworkXNoPath:
            pass

edge_index = torch.tensor([edges_src, edges_dst], dtype=torch.long)  # shape: [2, E]
print(f"노드 수: {N}, 엣지 수: {edge_index.size(1)}")


노드 수: 1993, 엣지 수: 887459


#### “동적 피처”(x_dyn) 만들기: 5분 윈도우 리샘플링 후 속도·가속·제동 집계

In [10]:
# (1) 우선 df_all에 timestamp가 datetime 타입으로 이미 들어 있다고 가정
#     (위 코드에서 Parquet을 만들 때 pd.to_datetime을 거쳤습니다.)

# (2) VEHICLE_ID별로 그룹화 + 5분 단위 리샘플링
#     우리가 필요로 하는 건, 각 5분 윈도우마다
#       - 평균 속도(mean of SPEED)
#       - 평균 가속(mean of ACC_SEC)
#       - 제동 횟수 혹은 제동 합계(sum of BRAKE_STATUS)
#     등으로 동적 피처를 요약한 타임스텝별 테이블
df_all = df_all.set_index("timestamp")

# 예시: dyn_df 라는 DataFrame을 만듭니다.
#  index: 타임스텝(datetime index, 5분 간격)
#  columns: MultiIndex (VEHICLE_ID, [SPEED_mean, ACC_SEC_mean, BRAKE_STATUS_sum])
dyn_list = []
for vid in vehicle_ids:
    df_vid = df_all[df_all["VEHICLE_ID"] == vid]
    # 속도 평균, 가속 평균, 제동 합계
    df_resampled = df_vid.resample("5min").agg({
        "SPEED": "mean",
        "ACC_SEC": "mean",
        "BRAKE_STATUS": "sum"
    })
    # 재색인하여 모든 타임스텝(5분 간격)에서 NaN을 허용하게 만들고, 뒤에서 0으로 채워 줘도 됩니다.
    dyn_list.append(df_resampled)

# 각 차량별로 resample된 DataFrame을 concat → MultiIndex columns
dyn_df = pd.concat(dyn_list, axis=1, keys=vehicle_ids)

# dyn_df의 인덱스(timestamps) 리스트
times = dyn_df.index.to_pydatetime().tolist()
T = len(times)  # 총 타임스텝 개수 (예: 하루 24*12 = 288개)

# (3) “N x T x F_d” 텐서를 만들어서 PyTorch Tensor로 변환
#       F_d = 3 (speed, acc, brake)
import numpy as np

# 미리 0으로 초기화
x_dyn_np = np.zeros((T, N, 3), dtype=np.float32)

for i_vid, vid in enumerate(vehicle_ids):
    sub = dyn_df[vid]  
    # sub는 columns=["SPEED", "ACC_SEC", "BRAKE_STATUS"]
    # NaN이 있을 경우 0으로 채워 주는 것이 일반적입니다.
    sub = sub.fillna(0.0)
    # 이제 sub.values.shape == (T, 3)
    x_dyn_np[:, i_vid, 0] = sub["SPEED"].values
    x_dyn_np[:, i_vid, 1] = sub["ACC_SEC"].values
    x_dyn_np[:, i_vid, 2] = sub["BRAKE_STATUS"].values

# 최종 PyTorch 텐서
x_dyn = torch.from_numpy(x_dyn_np)  # shape [T, N, 3]

#### (선택) “정적 피처”(x_static)을 추가하고 싶다면

In [None]:
x_static_np = np.zeros((N, 2), dtype=np.float32)  # [N, 2] shape
for i_vid, vid in enumerate(vehicle_ids):
    row = rep[rep["VEHICLE_ID"] == vid].iloc[0]
    x_static_np[i_vid, 0] = row["LATITUDE"]
    x_static_np[i_vid, 1] = row["LONGITUDE"]

x_static = torch.from_numpy(x_static_np)  # [N, 2]


#### PyTorch Dataset/DataLoader 만들기
- 위에서 만들어 둔 x_dyn, x_static, edge_index를 직접 래핑하여 Dataset을 구현하면 .pt 파일 없이도 바로 학습이 가능합니다.

In [14]:
import torch
from torch.utils.data import Dataset, DataLoader

class SpatioTemporalDataset(Dataset):
    def __init__(self, x_dyn, x_static, edge_index, seq_len, pre_len):
        """
        x_dyn      : torch.Tensor, shape [T, N, Fd]
                     → 동적 피처 (예: Fd=3: [SPEED, ACC_SEC, BRAKE_STATUS] 등)
        x_static   : torch.Tensor, shape [N, Fs]
                     → 정적 피처 (예: POI 정보, 노드별 속성 등)
        edge_index : torch.LongTensor, shape [2, E]
                     → GNN 에 넣을 엣지 인덱스
        seq_len    : int → 입력 시퀀스 길이 (예: 10)
        pre_len    : int → 예측할 미래 길이 (예: 3)
        """
        self.x_dyn      = x_dyn      # [T, N, Fd]
        self.x_static   = x_static   # [N, Fs]
        self.edge_index = edge_index # [2, E]
        
        T, N, Fd = x_dyn.shape

        # sliding window로 X, Y (입력, 정답) 시퀀스 생성
        X_list = []
        Y_list = []
        for t in range(T - seq_len - pre_len + 1):
            # 입력 구간: t ~ t + seq_len - 1
            # → shape [seq_len, N, Fd]
            seq_X = x_dyn[t : t + seq_len]
            # 예측 정답 구간: t+seq_len ~ t+seq_len+pre_len-1
            # → shape [pre_len, N, Fd]
            seq_Y = x_dyn[t + seq_len : t + seq_len + pre_len]

            X_list.append(seq_X)
            Y_list.append(seq_Y)

        # [S, seq_len, N, Fd],  [S, pre_len, N, Fd] 로 저장
        self.X = torch.stack(X_list, dim=0)  # [S, seq_len, N, Fd]
        self.Y = torch.stack(Y_list, dim=0)  # [S, pre_len, N, Fd]

    def __len__(self):
        return self.X.shape[0]

    def __getitem__(self, idx):
        return {
            'x_dyn'     : self.X[idx],        # [seq_len, N, Fd]
            'x_static'  : self.x_static,      # [N, Fs]
            'edge_index': self.edge_index,    # [2, E]
            'y'         : self.Y[idx]         # [pre_len, N, Fd]
        }


# -------------------------------------------------------
#  예시: DataLoader 생성 및 배치 확인
if __name__ == '__main__':
    # (1) 여기에 미리 준비된 텐서들을 넣어 주세요.
    # 예를 들어:
    #   x_dyn:  torch.randn(T, N, Fd)
    #   x_static: torch.randn(N, Fs)
    #   edge_index: torch.tensor([[0,1,2,...],[1,2,3,...]], dtype=torch.long)
    #
    # 아래는 더미 데이터를 만드는 예시입니다.
    T, N, Fd = 200, 726, 3   # 예를 들어: T=200 timesteps, N=726 노드, Fd=3 채널
    Fs = 5                   # 예를 들어 정적 피처 차원 Fs=5
    seq_len   = 10           # 입력 시퀀스 길이
    pre_len   = 3            # 예측 시퀀스 길이
    batch_size = 64

    # 더미 동적 피처 (예시: SPEED, ACC_SEC, BRAKE_STATUS)
    x_dyn = torch.randn(T, N, Fd)
    # 더미 정적 피처
    x_static = torch.randn(N, Fs)
    # 더미 엣지 인덱스 (임의로 만들어봄)
    edge_index = torch.randint(0, N, (2, N*2), dtype=torch.long)

    # Dataset/Loader 생성
    dataset = SpatioTemporalDataset(
        x_dyn      = x_dyn,
        x_static   = x_static,
        edge_index = edge_index,
        seq_len    = seq_len,
        pre_len    = pre_len
    )
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    # 한 배치 꺼내 보기
    batch = next(iter(loader))
    for k, v in batch.items():
        print(f"{k:10} → {tuple(v.shape)}")
    # 출력 예:
    #   x_dyn      → (64, 10, 726, 3)
    #   x_static   → (726, 5)
    #   edge_index → (2, ???)
    #   y          → (64, 3, 726, 3)


x_dyn      → (64, 10, 726, 3)
x_static   → (64, 726, 5)
edge_index → (64, 2, 1452)
y          → (64, 3, 726, 3)
