In [10]:
import pandas as pd
import numpy as np
import joblib
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
from pathlib import Path


In [11]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: cuda


In [12]:
# 1. 시나리오별 CSV 파일 로드 및 분석
# 시나리오 파일들 찾기
scenario_dir = Path("data/scenarios")  # 시나리오 CSV들이 있는 폴더
scenario_files = list(scenario_dir.glob("*.csv"))

scenario_info = []
for scenario_file in scenario_files:
    df_temp = pd.read_csv(scenario_file)
    scenario_info.append({
        'file': scenario_file,
        'name': scenario_file.stem,
        'rows': len(df_temp),
        'ues': df_temp['imsi'].nunique()
    })

print("📊 시나리오별 데이터 크기:")
for info in scenario_info:
    print(f"  {info['name']}: {info['rows']} rows, {info['ues']} UEs")

# 테스트 시나리오 직접 지정
test_scenario_name = "data_lstm_test"  # 원하는 시나리오 이름으로 변경
test_scenario = next((info for info in scenario_info if info['name'] == test_scenario_name), None)

if test_scenario is None:
    print(f"❌ 테스트 시나리오 '{test_scenario_name}'를 찾을 수 없습니다!")
    print(f"   사용 가능한 시나리오: {[info['name'] for info in scenario_info]}")
    exit()

# 나머지 시나리오들을 train으로
train_scenarios = [info for info in scenario_info if info['name'] != test_scenario_name]

print(f"\n🎯 Test scenario: {test_scenario['name']} ({test_scenario['rows']} rows)")
print(f"🎯 Train scenarios: {[s['name'] for s in train_scenarios]} (총 {len(train_scenarios)}개)")

# 2. 피처 컬럼 정의 (imsi 인코딩 제거)
feature_cols = [
    "relative_timestamp",                           
    "serving_x", "serving_y",                      
    "L3 serving SINR 3gpp_ma",                    
    "L3 neigh SINR 3gpp 1 (convertedSinr)_ma",   
    "L3 neigh SINR 3gpp 2 (convertedSinr)_ma",   
    "L3 neigh SINR 3gpp 3 (convertedSinr)_ma"    
]
target_cols = ["UE_x", "UE_y"]

print(f"\n📋 Feature 컬럼: {len(feature_cols)}개")
print(f"📋 Target 컬럼: {len(target_cols)}개")

📊 시나리오별 데이터 크기:
  data_lstm_test: 10348 rows, 28 UEs
  data_lstm_5: 50478 rows, 28 UEs
  data_lstm_3: 50478 rows, 28 UEs
  data_lstm_1: 50478 rows, 28 UEs
  data_lstm_6: 84448 rows, 28 UEs
  data_lstm_4: 50478 rows, 28 UEs
  data_lstm_0: 132392 rows, 28 UEs
  data_lstm_2: 50478 rows, 28 UEs

🎯 Test scenario: data_lstm_test (10348 rows)
🎯 Train scenarios: ['data_lstm_5', 'data_lstm_3', 'data_lstm_1', 'data_lstm_6', 'data_lstm_4', 'data_lstm_0', 'data_lstm_2'] (총 7개)

📋 Feature 컬럼: 7개
📋 Target 컬럼: 2개


In [13]:
# %% 4. 시퀀스 데이터셋 클래스
class UESequenceDataset(Dataset):
    def __init__(self, sequences, targets):
        self.sequences = torch.FloatTensor(sequences)
        self.targets = torch.FloatTensor(targets)
    
    def __len__(self):
        return len(self.sequences)
    
    def __getitem__(self, idx):
        return self.sequences[idx], self.targets[idx]


In [None]:
# 5. 시나리오별 시퀀스 생성 (경계 넘지 않음)
LOOKBACK = 10

# Train 시퀀스 생성
train_seq_X, train_seq_y, train_seq_imsi = [], [], []

for train_info in train_scenarios:
    print(f"Processing train sequences: {train_info['name']}")
    df = pd.read_csv(train_info['file'])
    df["imsi"] = df["imsi"].astype("category").cat.codes
    
    for imsi_val, g in df.groupby("imsi"):
        g = g.sort_values("relative_timestamp")
        arr = g[feature_cols].to_numpy(dtype="float32")
        tgt = g[target_cols].to_numpy(dtype="float32")
        
        # 시나리오 내에서만 시퀀스 생성
        for i in range(len(arr) - LOOKBACK):
            train_seq_X.append(arr[i : i + LOOKBACK])
            train_seq_y.append(tgt[i + LOOKBACK])
            train_seq_imsi.append(f"{train_info['name']}_{imsi_val}")

# Test 시퀀스 생성
test_seq_X, test_seq_y, test_seq_imsi = [], [], []

print(f"Processing test sequences: {test_scenario['name']}")
df = pd.read_csv(test_scenario['file'])
df["imsi"] = df["imsi"].astype("category").cat.codes

for imsi_val, g in df.groupby("imsi"):
    g = g.sort_values("relative_timestamp")
    arr = g[feature_cols].to_numpy(dtype="float32")
    tgt = g[target_cols].to_numpy(dtype="float32")
    
    for i in range(len(arr) - LOOKBACK):
        test_seq_X.append(arr[i : i + LOOKBACK])
        test_seq_y.append(tgt[i + LOOKBACK])
        test_seq_imsi.append(f"{test_scenario['name']}_{imsi_val}")

# 배열 변환
X_train = np.array(train_seq_X)
y_train = np.array(train_seq_y)
imsi_train = np.array(train_seq_imsi)

X_test = np.array(test_seq_X)
y_test = np.array(test_seq_y)
imsi_test = np.array(test_seq_imsi)

print(f"Train 데이터: X={X_train.shape}, y={y_train.shape}")
print(f"Test 데이터: X={X_test.shape}, y={y_test.shape}")

Processing train sequences: data_lstm_5
Processing train sequences: data_lstm_3
Processing train sequences: data_lstm_1
Processing train sequences: data_lstm_6
Processing train sequences: data_lstm_4
Processing train sequences: data_lstm_0
Processing train sequences: data_lstm_2
Processing test sequences: data_lstm_test
Train 데이터: X=(467270, 10, 7), y=(467270, 2)
Test 데이터: X=(10068, 10, 7), y=(10068, 2)


In [15]:
# %% 9. PyTorch LSTM 모델 정의
class UELocalizationLSTM(nn.Module):
    def __init__(self, input_size, hidden_size=128, num_layers=3, output_size=2):
        super(UELocalizationLSTM, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # Bidirectional LSTM
        self.lstm = nn.LSTM(
            input_size, 
            hidden_size, 
            num_layers, 
            batch_first=True, 
            bidirectional=True,
            dropout=0.2 if num_layers > 1 else 0
        )
        
        # Dense layers
        self.fc1 = nn.Linear(hidden_size * 2, 32)  # *2 for bidirectional
        self.fc2 = nn.Linear(32, output_size)
        self.dropout = nn.Dropout(0.1)
        
    def forward(self, x):
        # LSTM forward
        lstm_out, _ = self.lstm(x)
        
        # 마지막 시점 출력 사용
        last_output = lstm_out[:, -1, :]
        
        # Dense layers
        out = F.relu(self.fc1(last_output))
        out = self.dropout(out)
        out = self.fc2(out)
        
        return out


In [16]:
# 7-12. 스케일러별 비교 테스트
scalers = {
    'Standard': StandardScaler()
}

scaler_results = {}
n_features = X_train.shape[2]

for scaler_name, scaler_X in scalers.items():
    print(f"\n🧪 Testing {scaler_name}Scaler...")
    
    # Y는 항상 StandardScaler (좌표는 표준화가 좋음)
    scaler_Y = StandardScaler()
    
    # 스케일링
    X_train_scaled = scaler_X.fit_transform(X_train.reshape(-1, n_features)).reshape(X_train.shape)
    X_test_scaled = scaler_X.transform(X_test.reshape(-1, n_features)).reshape(X_test.shape)
    y_train_scaled = scaler_Y.fit_transform(y_train)
    y_test_scaled = scaler_Y.transform(y_test)
    
    # 데이터로더 생성
    train_dataset = UESequenceDataset(X_train_scaled, y_train_scaled)
    test_dataset = UESequenceDataset(X_test_scaled, y_test_scaled)
    train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=256, shuffle=False)
    
    # 모델 초기화
    model = UELocalizationLSTM(input_size=n_features).to(device)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.0005)
    
    # 훈련 함수 (동일)
    def train_model_scaler(model, train_loader, criterion, optimizer, epochs=100):
        model.train()
        train_losses = []
        
        for epoch in range(epochs):
            epoch_loss = 0.0
            num_batches = 0
            
            for batch_x, batch_y in train_loader:
                batch_x, batch_y = batch_x.to(device), batch_y.to(device)
                
                outputs = model(batch_x)
                loss = criterion(outputs, batch_y)
                
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                
                epoch_loss += loss.item()
                num_batches += 1
            
            avg_loss = epoch_loss / num_batches
            train_losses.append(avg_loss)
            
            if (epoch + 1) % 20 == 0:  # 덜 자주 출력
                print(f'  Epoch [{epoch+1}/50], Loss: {avg_loss:.6f}')
        
        return train_losses

    # 모델 훈련
    print(f"  Training with {scaler_name}Scaler...")
    train_losses = train_model_scaler(model, train_loader, criterion, optimizer, epochs=100)
    
    # 평가
    model.eval()
    all_predictions = []
    all_targets = []
    
    with torch.no_grad():
        for batch_x, batch_y in test_loader:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)
            outputs = model(batch_x)
            
            all_predictions.append(outputs.cpu().numpy())
            all_targets.append(batch_y.cpu().numpy())
    
    predictions = np.vstack(all_predictions)
    targets = np.vstack(all_targets)
    
    # 스케일링 되돌리기
    predictions_unscaled = scaler_Y.inverse_transform(predictions)
    targets_unscaled = scaler_Y.inverse_transform(targets)
    
    # RMSE 계산
    rmse = np.sqrt(np.mean((predictions_unscaled - targets_unscaled)**2))
    
    scaler_results[scaler_name] = {
        'rmse': rmse,
        'model': model,
        'scaler_X': scaler_X,
        'scaler_Y': scaler_Y,
        'train_losses': train_losses
    }
    
    print(f"  ✅ {scaler_name}Scaler RMSE: {rmse:.2f}m")

# 최고 스케일러 선택
best_scaler_name = min(scaler_results.items(), key=lambda x: x[1]['rmse'])[0]
best_result = scaler_results[best_scaler_name]

print(f"\n🏆 Best Scaler: {best_scaler_name} (RMSE: {best_result['rmse']:.2f}m)")
print("\n📊 전체 결과:")
for name, result in scaler_results.items():
    print(f"  {name}: {result['rmse']:.2f}m")

# 최고 모델로 설정
model = best_result['model']
scX = best_result['scaler_X'] 
scY = best_result['scaler_Y']
train_losses = best_result['train_losses']
rmse = best_result['rmse']

print(f"\n✅ {best_scaler_name}Scaler를 최종 모델로 선택")


🧪 Testing StandardScaler...
  Training with StandardScaler...
  Epoch [20/50], Loss: 0.015291
  Epoch [40/50], Loss: 0.014625
  Epoch [60/50], Loss: 0.014399
  Epoch [80/50], Loss: 0.014253
  Epoch [100/50], Loss: 0.014217
  ✅ StandardScaler RMSE: 7.98m

🏆 Best Scaler: Standard (RMSE: 7.98m)

📊 전체 결과:
  Standard: 7.98m

✅ StandardScaler를 최종 모델로 선택


In [17]:
# %% 15. 모델 저장
# PyTorch 방식 - h5py 없이!
model_save_dict = {
    'model_state_dict': model.state_dict(),
    'model_config': {
        'input_size': n_features,
        'hidden_size': 64,
        'output_size': 2
    },
    'train_losses': train_losses,
    'rmse': rmse
}

torch.save(model_save_dict, 'lstm_positioning.pth')
print("✅ PyTorch 모델 저장 완료: pytorch_lstm_positioning.pth")

# 16. 스케일러 저장
joblib.dump(scX, "lstm_x.pkl")
joblib.dump(scY, "lstm_y.pkl")
print("✅ 스케일러 저장 완료")

✅ PyTorch 모델 저장 완료: pytorch_lstm_positioning.pth
✅ 스케일러 저장 완료


In [18]:
# %%
# 시각화 코드 (맨 마지막에 추가)
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.patches import Circle
import warnings
warnings.filterwarnings('ignore')

plt.style.use('default')
sns.set_palette("husl")

# %%
def plot_ue_trajectories(results_df, max_ues=6, figsize=(15, 10)):
    """UE별 궤적 비교 (예측 vs 실제)"""
    
    # 데이터가 많은 상위 UE들 선택
    ue_counts = results_df['imsi'].value_counts().head(max_ues)
    selected_ues = ue_counts.index.tolist()
    
    fig, axes = plt.subplots(2, 3, figsize=figsize)
    axes = axes.flatten()
    
    for i, ue_id in enumerate(selected_ues):
        if i >= len(axes):
            break
            
        ue_data = results_df[results_df['imsi'] == ue_id].copy()
        ue_data = ue_data.sort_index()  # 시간순 정렬
        
        ax = axes[i]
        
        # 실제 궤적은 선으로 연결 (시간 순서대로)
        ax.scatter(ue_data['true_x'], ue_data['true_y'], 
                color='blue', alpha=0.6, s=20, zorder=3)

        # 예측 결과는 점만 표시 (선 연결 안 함)
        ax.scatter(ue_data['pred_x'], ue_data['pred_y'], 
                color='red', alpha=0.7, s=25, marker='s', 
                label='Predicted Points', zorder=4)
        
        # 시작점과 끝점 표시
        ax.scatter(ue_data['true_x'].iloc[0], ue_data['true_y'].iloc[0], 
                  color='green', s=100, marker='o', label='Start', zorder=5)
        ax.scatter(ue_data['true_x'].iloc[-1], ue_data['true_y'].iloc[-1], 
                  color='black', s=100, marker='X', label='End', zorder=5)
        
        # UE별 RMSE 계산
        ue_rmse = np.sqrt(np.mean((ue_data['pred_x'] - ue_data['true_x'])**2 + 
                                 (ue_data['pred_y'] - ue_data['true_y'])**2))
        
        ax.set_title(f'UE {ue_id} Trajectory\n({len(ue_data)} points, RMSE: {ue_rmse:.1f}m)')
        ax.set_xlabel('X Coordinate (m)')
        ax.set_ylabel('Y Coordinate (m)')
        ax.legend()
        ax.grid(True, alpha=0.3)
        ax.axis('equal')
    
    # 빈 subplot 제거
    for j in range(i+1, len(axes)):
        fig.delaxes(axes[j])
    
    plt.tight_layout()
    plt.savefig('ue_trajectories.png', dpi=300, bbox_inches='tight')
    plt.show()

# %%
def plot_error_analysis(results_df, figsize=(15, 12)):
    """오차 분석 다양한 관점"""
    
    # 오차 계산
    results_df['error_x'] = results_df['pred_x'] - results_df['true_x']
    results_df['error_y'] = results_df['pred_y'] - results_df['true_y']
    results_df['error_distance'] = np.sqrt(results_df['error_x']**2 + results_df['error_y']**2)
    
    fig, axes = plt.subplots(2, 3, figsize=figsize)
    
    # 1. UE별 RMSE 히스토그램
    ue_rmse = results_df.groupby('imsi').apply(
        lambda x: np.sqrt(np.mean(x['error_distance']**2))
    ).reset_index(name='rmse')
    
    axes[0,0].hist(ue_rmse['rmse'], bins=20, alpha=0.7, color='skyblue', edgecolor='black')
    axes[0,0].axvline(ue_rmse['rmse'].mean(), color='red', linestyle='--', 
                     label=f'Mean: {ue_rmse["rmse"].mean():.1f}m')
    axes[0,0].set_title('UE별 RMSE 분포')
    axes[0,0].set_xlabel('RMSE (m)')
    axes[0,0].set_ylabel('UE 개수')
    axes[0,0].legend()
    axes[0,0].grid(True, alpha=0.3)
    
    # 2. 오차 거리 히스토그램
    axes[0,1].hist(results_df['error_distance'], bins=50, alpha=0.7, 
                   color='lightcoral', edgecolor='black')
    axes[0,1].axvline(results_df['error_distance'].mean(), color='red', linestyle='--',
                     label=f'Mean: {results_df["error_distance"].mean():.1f}m')
    axes[0,1].set_title('예측 오차 거리 분포')
    axes[0,1].set_xlabel('Error Distance (m)')
    axes[0,1].set_ylabel('샘플 개수')
    axes[0,1].legend()
    axes[0,1].grid(True, alpha=0.3)
    
    # 3. X, Y 오차 산점도
    axes[0,2].scatter(results_df['error_x'], results_df['error_y'], 
                     alpha=0.5, s=10, color='purple')
    axes[0,2].axhline(0, color='black', linestyle='-', alpha=0.3)
    axes[0,2].axvline(0, color='black', linestyle='-', alpha=0.3)
    axes[0,2].set_title('X-Y 오차 분포')
    axes[0,2].set_xlabel('X Error (m)')
    axes[0,2].set_ylabel('Y Error (m)')
    axes[0,2].grid(True, alpha=0.3)
    axes[0,2].axis('equal')
    
    # 4. UE별 평균 오차 (상위 10개)
    top_ues = results_df.groupby('imsi')['error_distance'].mean().nlargest(10)
    axes[1,0].barh(range(len(top_ues)), top_ues.values, color='orange', alpha=0.7)
    axes[1,0].set_yticks(range(len(top_ues)))
    axes[1,0].set_yticklabels([f'UE {ue}' for ue in top_ues.index])
    axes[1,0].set_title('오차가 큰 상위 10개 UE')
    axes[1,0].set_xlabel('Average Error Distance (m)')
    axes[1,0].grid(True, alpha=0.3)
    
    # 5. 샘플 수 vs 오차 관계
    ue_stats = results_df.groupby('imsi').agg({
        'error_distance': 'mean',
        'imsi': 'count'
    }).rename(columns={'imsi': 'sample_count'})
    
    axes[1,1].scatter(ue_stats['sample_count'], ue_stats['error_distance'], 
                     alpha=0.7, s=50, color='green')
    axes[1,1].set_title('샘플 수 vs 평균 오차')
    axes[1,1].set_xlabel('Sample Count per UE')
    axes[1,1].set_ylabel('Average Error Distance (m)')
    axes[1,1].grid(True, alpha=0.3)
    
    # 6. 누적 오차 분포 (CDF)
    sorted_errors = np.sort(results_df['error_distance'])
    p = np.arange(1, len(sorted_errors) + 1) / len(sorted_errors)
    axes[1,2].plot(sorted_errors, p, linewidth=2, color='navy')
    axes[1,2].axvline(np.percentile(sorted_errors, 50), color='red', linestyle='--', 
                     label=f'50th: {np.percentile(sorted_errors, 50):.1f}m')
    axes[1,2].axvline(np.percentile(sorted_errors, 90), color='orange', linestyle='--',
                     label=f'90th: {np.percentile(sorted_errors, 90):.1f}m')
    axes[1,2].set_title('오차 누적 분포 (CDF)')
    axes[1,2].set_xlabel('Error Distance (m)')
    axes[1,2].set_ylabel('Cumulative Probability')
    axes[1,2].legend()
    axes[1,2].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.savefig('error_analysis.png', dpi=300, bbox_inches='tight')
    plt.show()
    
    # 통계 출력
    print(f"📊 오차 통계:")
    print(f"  평균 오차: {results_df['error_distance'].mean():.2f} m")
    print(f"  중위수 오차: {results_df['error_distance'].median():.2f} m")
    print(f"  90th percentile: {np.percentile(results_df['error_distance'], 90):.2f} m")
    print(f"  최대 오차: {results_df['error_distance'].max():.2f} m")

# %%
def plot_prediction_scatter(results_df, figsize=(12, 5)):
    """예측 vs 실제 산점도"""
    
    fig, axes = plt.subplots(1, 2, figsize=figsize)
    
    # X 좌표 비교
    axes[0].scatter(results_df['true_x'], results_df['pred_x'], 
                   alpha=0.6, s=20, color='blue')
    
    # 완벽한 예측선 (y=x)
    min_x, max_x = results_df['true_x'].min(), results_df['true_x'].max()
    axes[0].plot([min_x, max_x], [min_x, max_x], 'r--', linewidth=2, label='Perfect Prediction')
    
    axes[0].set_xlabel('True X (m)')
    axes[0].set_ylabel('Predicted X (m)')
    axes[0].set_title('X Coordinate Prediction')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)
    axes[0].axis('equal')
    
    # Y 좌표 비교
    axes[1].scatter(results_df['true_y'], results_df['pred_y'], 
                   alpha=0.6, s=20, color='green')
    
    min_y, max_y = results_df['true_y'].min(), results_df['true_y'].max()
    axes[1].plot([min_y, max_y], [min_y, max_y], 'r--', linewidth=2, label='Perfect Prediction')
    
    axes[1].set_xlabel('True Y (m)')
    axes[1].set_ylabel('Predicted Y (m)')
    axes[1].set_title('Y Coordinate Prediction')
    axes[1].legend()
    axes[1].grid(True, alpha=0.3)
    axes[1].axis('equal')
    
    plt.tight_layout()
    plt.savefig('prediction_scatter.png', dpi=300, bbox_inches='tight')
    plt.show()

# %%
def plot_comprehensive_analysis(results_df):
    """종합 분석 실행"""
    print("🎨 시각화 시작...")
    
    # 1. UE별 궤적
    print("1. UE별 궤적 그래프...")
    plot_ue_trajectories(results_df, max_ues=6)
    
    # 2. 오차 분석
    print("2. 오차 분석 그래프...")
    plot_error_analysis(results_df)
    
    # 3. 예측 vs 실제 산점도
    print("3. 예측 vs 실제 산점도...")
    plot_prediction_scatter(results_df)
    
    print("✅ 모든 시각화 완료!")

# %%
# 실행 (기존 코드 마지막에 추가)
plot_comprehensive_analysis(results_df)

NameError: name 'results_df' is not defined