In [None]:
import pandas as pd
import numpy as np
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Masking
from keras.callbacks import EarlyStopping
from keras.optimizers import Adam

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.sequence import pad_sequences


In [None]:
# csv 파일 경로
csv_path = '/content/drive/MyDrive/trajectory_dataset_transformed.csv'
df = pd.read_csv(csv_path, header=None)
transformed_data = df.values.tolist()
df

In [None]:
# id별로 데이터를 분리
id_data = {}
for row in transformed_data:
    id_value = row[2]
    if id_value not in id_data:
        id_data[id_value] = []
    id_data[id_value].append(row)
id_data

In [None]:
# id별 시퀀스를 생성하고 패딩 처리
def create_dataset(data, sequence_length=5): # sequence_length에 기본값 5를 설정합니다.
    x_data, y_data = [], []
    for i in range(len(data) - sequence_length):
        x_data.append(data[i:i+sequence_length])
        y_data.append(data[i+sequence_length])
    return np.array(x_data), np.array(y_data)

sequence_length = 10

x_data, y_data = [], []
for id_value, data in id_data.items():
    data = np.array(data)
    x, y = create_dataset(data, sequence_length)
    x_data.append(x)
    y_data.append(y)
print(len(x_data))
print(len(y_data))
print(x_data[1],"y_data",y_data[1])

In [None]:
def pad_nested_sequences(sequences, dtype="float32", padding="post"):
    max_len_outer = max(len(s) for s in sequences)
    max_len_inner = max(len(t) for s in sequences for t in s if len(t) > 0)
    output_shape = sequences[0][0].shape[1] if len(sequences[0]) > 0 else 4
    padded_sequences = np.zeros((len(sequences), max_len_outer, max_len_inner, output_shape), dtype=dtype)

    for i, s in enumerate(sequences):
        for j, t in enumerate(s):
            if len(t) > 0:
                if padding == "post":
                    padded_sequences[i, j, :len(t), :] = t
                elif padding == "pre":
                    padded_sequences[i, j, -len(t):, :] = t

    return padded_sequences
x_data = pad_nested_sequences(x_data, dtype="float32", padding="post")
y_data = pad_sequences(y_data, dtype="float32", padding="post")


In [None]:
train_ratio = 0.8
train_size = int(x_data.shape[0] * train_ratio)

train_x_data = x_data[:train_size]
train_y_data = y_data[:train_size]

test_x_data = x_data[train_size:]
test_y_data = y_data[train_size:]

In [None]:
from keras.layers import LSTM, Dense
from keras.regularizers import l1_l2


x_data = x_data.reshape(-1, sequence_length, 4)
y_data = y_data.reshape(-1, 4)

# 모델 정의
model = Sequential()
model.add(Masking(mask_value=0., input_shape=(None, 4)))
model.add(LSTM(128, activation='relu', return_sequences=True, kernel_regularizer=l1_l2(l1=0.01, l2=0.01)))
model.add(LSTM(64, activation='relu', kernel_regularizer=l1_l2(l1=0.01, l2=0.01)))
model.add(Dense(4))

optimizer = Adam(learning_rate=0.001) # 작은 learning rate로 시작
model.compile(optimizer=optimizer, loss='mse', metrics=['mae'])

# EarlyStopping 객체 생성
early_stopping = EarlyStopping(monitor='val_loss', patience=50, restore_best_weights=True)

# 모델 학습
history = model.fit(x_data, y_data, epochs=1000, batch_size=32, validation_split=0.2, shuffle=False, callbacks=[early_stopping])


In [None]:
import matplotlib.pyplot as plt 
# 학습 과정에서 기록된 히스토리를 가져옵니다.
loss = history.history['loss']
mae = history.history['mae']
val_loss = history.history['val_loss']
val_mae = history.history['val_mae']
epochs = range(1, len(loss) + 1)

# Loss 그래프
plt.figure()
plt.plot(epochs, loss, 'b', label='Training Loss')
plt.plot(epochs, val_loss, 'r', label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.title('Training and Validation Loss')
plt.show()

# MAE 그래프
plt.figure()
plt.plot(epochs, mae, 'b', label='Training MAE')
plt.plot(epochs, val_mae, 'r', label='Validation MAE')
plt.xlabel('Epochs')
plt.ylabel('Mean Absolute Error')
plt.legend()
plt.title('Training and Validation Mean Absolute Error')
plt.show()


In [None]:
import matplotlib.pyplot as plt

# 테스트 데이터에서 한 ID에 대한 예측 생성
test_id = 0
test_sequence = test_x_data[test_id]
test_sequence = test_sequence.reshape(1, -1, 4)
predicted_trajectory = model.predict(test_sequence).reshape(-1, 4)

# 실제 데이터와 예측 데이터의 좌표 추출
actual_x = test_y_data[test_id, 0]
actual_y = test_y_data[test_id, 1]
predicted_x = predicted_trajectory[:, 0]
predicted_y = predicted_trajectory[:, 1]

# 실제 데이터와 예측 데이터 시각화
plt.plot(actual_x, actual_y, 'bo-', label='Actual')
plt.plot(predicted_x, predicted_y, 'ro-', label='Predicted')
plt.xlabel('x')
plt.ylabel('y')
plt.legend()
plt.title('Trajectory Comparison: Actual vs. Predicted')
plt.show()
