In [None]:
# import numpy as np
# import pandas as pd
# from tensorflow.keras.models import Sequential
# from tensorflow.keras.layers import LSTM, Dense, Masking
# from tensorflow.keras.preprocessing.sequence import pad_sequences
# from sklearn.model_selection import train_test_split
# from sklearn.preprocessing import LabelEncoder
# from sklearn.metrics import accuracy_score

# # 예제 데이터 생성 (시계열 데이터 A와 B)
# # A는 P1 ~ P10, B는 P1 ~ P15 (길이가 다른 시계열 데이터)
# np.random.seed(42)
# data_A = [np.random.rand(10) for _ in range(100)]  # 100개의 샘플
# data_B = [np.random.rand(15) for _ in range(100)]  # 100개의 샘플

# labels_A = np.random.randint(0, 2, size=100)  # 0 또는 1의 레이블
# labels_B = np.random.randint(0, 2, size=100)

# # 데이터와 레이블 통합
# data = data_A + data_B
# labels = np.concatenate([labels_A, labels_B])

# # 데이터를 고정된 길이로 패딩 처리
# max_length = 15  # 데이터에서 가장 긴 길이
# padded_data = pad_sequences(data, padding='post', maxlen=max_length, dtype='float32')

# # 학습/테스트 데이터 분리
# X_train, X_test, y_train, y_test = train_test_split(padded_data, labels, test_size=0.2, random_state=42)

# # 레이블을 0과 1로 인코딩 (이미 0과 1인 경우 생략 가능)
# encoder = LabelEncoder()
# y_train = encoder.fit_transform(y_train)
# y_test = encoder.transform(y_test)

# # LSTM 모델 정의
# model = Sequential([
#     Masking(mask_value=0.0, input_shape=(max_length, 1)),  # 패딩된 부분 무시
#     LSTM(64, return_sequences=False),  # LSTM 레이어
#     Dense(1, activation='sigmoid')  # 출력 레이어 (이진 분류)
# ])

# model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# # 데이터를 (samples, timesteps, features) 형태로 변환
# X_train = X_train[..., np.newaxis]
# X_test = X_test[..., np.newaxis]

# # 모델 학습
# model.fit(X_train, y_train, epochs=10, batch_size=16, validation_split=0.1)

# # 모델 평가
# y_pred = (model.predict(X_test) > 0.5).astype("int32")
# accuracy = accuracy_score(y_test, y_pred)
# print(f"Test Accuracy: {accuracy:.2f}")


In [1]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import MinMaxScaler

In [None]:
# 데이터 로딩 및 전처리
def load_and_preprocess_data(file_path):
    df = pd.read_csv(file_path)
    
    # Q, M, P 컬럼 선택
    q_cols = [col for col in df.columns if col.startswith('Q')]
    m_cols = [col for col in df.columns if col.startswith('M')]
    p_cols = [col for col in df.columns if col.startswith('P')]
    
    # 선택된 컬럼들을 하나의 리스트로 합침
    feature_cols = q_cols + m_cols + p_cols
    
    # 데이터 정규화
    scaler = MinMaxScaler()
    normalized_data = scaler.fit_transform(df[feature_cols])
    
    return normalized_data, len(feature_cols)

# 시퀀스 생성 함수
def create_sequences(data, seq_length):
    sequences = []
    for i in range(len(data) - seq_length + 1):
        sequences.append(data[i:i+seq_length])
    return np.array(sequences)

In [None]:
# 데이터 로드 및 전처리
data, input_size = load_and_preprocess_data('your_data_file.csv')
sequence_length = 60
sequences = create_sequences(data, sequence_length)

# PyTorch 텐서로 변환
X = torch.FloatTensor(sequences)
y = torch.zeros(len(sequences))  # 여기서는 모든 데이터를 정상으로 가정

# 데이터셋 및 DataLoader 생성
dataset = TensorDataset(X, y)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

In [None]:
# LSTM Autoencoder 모델 정의 (이전과 동일)
class LSTMAutoencoder(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers):
        super(LSTMAutoencoder, self).__init__()
        self.encoder = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.decoder = nn.LSTM(hidden_size, input_size, num_layers, batch_first=True)

    def forward(self, x):
        _, (hidden, _) = self.encoder(x)
        decoder_input = hidden.permute(1, 0, 2).repeat(1, x.size(1), 1)
        output, _ = self.decoder(decoder_input)
        return output

In [None]:
# 모델 초기화 (input_size가 변경됨)
model = LSTMAutoencoder(input_size, hidden_size=64, num_layers=2)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters())

In [None]:
from tqdm import tqdm

# 학습 루프
num_epochs = 100
for epoch in tqdm(range(num_epochs), desc="Epochs"):
    model.train()
    epoch_loss = 0
    for batch_X, _ in tqdm(dataloader, desc=f"Epoch {epoch+1}/{num_epochs}", leave=False):
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_X)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
    
    avg_loss = epoch_loss / len(dataloader)
    if (epoch + 1) % 10 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Average Loss: {avg_loss:.4f}')

In [None]:
# 이상 감지 함수 (이전과 동일)
def detect_anomalies(model, data, threshold):
    model.eval()
    with torch.no_grad():
        reconstructed = model(data)
        mse = nn.MSELoss(reduction='none')
        reconstruction_errors = mse(reconstructed, data).mean(axis=(1,2))
        return reconstruction_errors > threshold

In [None]:
# 이상 감지 예시 (테스트 데이터도 같은 방식으로 전처리 필요)
test_data = torch.FloatTensor(create_sequences(data[-100:], sequence_length))
anomalies = detect_anomalies(model, test_data, threshold=0.5)
print("Detected anomalies:", anomalies.sum().item())