In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import confusion_matrix


In [6]:
!pip install imblearn

Collecting imblearn
  Downloading imblearn-0.0-py2.py3-none-any.whl.metadata (355 bytes)
Collecting imbalanced-learn (from imblearn)
  Downloading imbalanced_learn-0.12.4-py3-none-any.whl.metadata (8.3 kB)
Downloading imblearn-0.0-py2.py3-none-any.whl (1.9 kB)
Downloading imbalanced_learn-0.12.4-py3-none-any.whl (258 kB)
Installing collected packages: imbalanced-learn, imblearn
Successfully installed imbalanced-learn-0.12.4 imblearn-0.0


In [9]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from imblearn.over_sampling import SMOTE
import numpy as np
import pandas as pd

# Dataset 정의
class SpotifyDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.FloatTensor(X)
        self.y = torch.FloatTensor(y)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# 모델 정의
class SpotifyRankPredictor(nn.Module):
    def __init__(self, num_categories):
        super(SpotifyRankPredictor, self).__init__()

        self.input_layer = nn.Linear(8, 64)
        self.block1 = nn.Sequential(
            nn.Linear(64, 64),
            nn.ReLU(),
            nn.BatchNorm1d(64),
            nn.Dropout(0.3)
        )
        self.block2 = nn.Sequential(
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.BatchNorm1d(32),
            nn.Dropout(0.3)
        )
        self.output_layer = nn.Linear(32, num_categories)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x = self.input_layer(x)
        x = nn.ReLU()(x)

        identity = x
        x = self.block1(x) + identity

        x = self.block2(x)
        x = self.output_layer(x)
        return self.softmax(x)

# 클래스 가중치 계산
def calculate_class_weights(categories):
    class_counts = np.bincount(categories)
    weights = 1 + np.log1p(np.max(class_counts) / class_counts)
    return torch.FloatTensor(weights)

# 데이터 전처리 (SMOTE 적용)
def preprocess_data_with_smote(df):
    X = df[['Danceability', 'Energy', 'Loudness', 'Speechiness',
            'Acousticness', 'Liveness', 'Tempo', 'Duration (ms)']].values

    def rank_to_category(rank):
        if rank <= 30:
            return 0
        elif rank <= 50:
            return 1
        else:
            return 2

    ranks = df['Highest Charting Position'].values
    categories = np.array([rank_to_category(rank) for rank in ranks])

    # 특성 스케일링
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)

    # SMOTE 적용
    smote = SMOTE(random_state=42)
    X_resampled, categories_resampled = smote.fit_resample(X_scaled, categories)

    # 원-핫 인코딩
    num_categories = 3
    y_encoded = np.eye(num_categories)[categories_resampled]

    # 클래스 가중치 계산
    class_weights = calculate_class_weights(categories_resampled)

    print("Final category counts after SMOTE:", np.bincount(categories_resampled))

    return X_resampled, y_encoded, num_categories, class_weights, scaler  # scaler 추가

# 모델 학습
def train_model(model, train_loader, val_loader, criterion, optimizer,
                num_epochs=150, patience=15):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    criterion = criterion.to(device)

    best_val_loss = float('inf')
    patience_counter = 0
    train_losses = []
    val_losses = []

    for epoch in range(num_epochs):
        # 훈련
        model.train()
        train_loss = 0

        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)

            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            train_loss += loss.item()

        train_loss /= len(train_loader)

        # 검증
        model.eval()
        val_loss = 0

        with torch.no_grad():
            for X_batch, y_batch in val_loader:
                X_batch, y_batch = X_batch.to(device), y_batch.to(device)
                outputs = model(X_batch)
                loss = criterion(outputs, y_batch)
                val_loss += loss.item()

        val_loss /= len(val_loader)
        val_losses.append(val_loss)

        print(f"Epoch [{epoch + 1}/{num_epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")

        # Early stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
            torch.save(model.state_dict(), 'best_model.pth')
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print("Early stopping triggered!")
                break

    return train_losses, val_losses

def analyze_misclassifications(model, test_loader, feature_names, scaler):
    model.eval()
    misclassified_samples = []

    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            outputs = model(X_batch)
            _, predicted = torch.max(outputs.data, 1)
            _, actual = torch.max(y_batch.data, 1)

            # 잘못 예측된 샘플 찾기
            mask = (predicted != actual)
            if mask.any():
                wrong_X = X_batch[mask]
                wrong_pred = predicted[mask]
                wrong_actual = actual[mask]
                wrong_probs = outputs[mask]

                # 원래 스케일로 되돌리기
                original_features = scaler.inverse_transform(wrong_X)

                for i in range(len(wrong_X)):
                    sample = {
                        'actual': wrong_actual[i].item(),
                        'predicted': wrong_pred[i].item(),
                        'confidence': wrong_probs[i].max().item(),
                        'features': dict(zip(feature_names, original_features[i]))
                    }
                    misclassified_samples.append(sample)

    return misclassified_samples

def analyze_feature_importance(model, test_loader, feature_names):
    model.eval()
    feature_impacts = {name: [] for name in feature_names}

    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            base_output = model(X_batch)

            # 각 특성에 대해 작은 변화를 주고 출력 변화 관찰
            for i, feature in enumerate(feature_names):
                X_modified = X_batch.clone()
                X_modified[:, i] += 0.1  # 작은 변화
                modified_output = model(X_modified)

                # 출력 변화량 계산
                output_change = torch.abs(modified_output - base_output).mean().item()
                feature_impacts[feature].append(output_change)

    # 평균 영향도 계산
    for feature in feature_names:
        feature_impacts[feature] = np.mean(feature_impacts[feature])

    return feature_impacts


# 메인 함수
def main():
    df = pd.read_csv('spotify_dataset.csv')
    # scaler 받아오기
    X_scaled, y_encoded, num_categories, class_weights, scaler = preprocess_data_with_smote(df)

    # 나머지 코드는 동일
    X_train, X_temp, y_train, y_temp = train_test_split(
        X_scaled, y_encoded, test_size=0.2, random_state=42, stratify=y_encoded.argmax(axis=1)
    )
    X_val, X_test, y_val, y_test = train_test_split(
        X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp.argmax(axis=1)
    )

    train_dataset = SpotifyDataset(X_train, y_train)
    val_dataset = SpotifyDataset(X_val, y_val)
    test_dataset = SpotifyDataset(X_test, y_test)

    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=64)
    test_loader = DataLoader(test_dataset, batch_size=64)

    model = SpotifyRankPredictor(num_categories)

    criterion = nn.CrossEntropyLoss(weight=class_weights)
    optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)

    train_losses, val_losses = train_model(
        model=model,
        train_loader=train_loader,
        val_loader=val_loader,
        criterion=criterion,
        optimizer=optimizer,
        num_epochs=150,
        patience=15
    )

    # 테스트
    model.load_state_dict(torch.load('best_model.pth'))
    model.eval()

    test_correct = 0
    test_total = 0
    y_pred = []
    y_true = []

    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            outputs = model(X_batch)
            _, predicted = torch.max(outputs.data, 1)
            _, actual = torch.max(y_batch.data, 1)
            test_total += y_batch.size(0)
            test_correct += (predicted == actual).sum().item()
            y_pred.extend(predicted.cpu().numpy())
            y_true.extend(actual.cpu().numpy())

    accuracy = 100 * test_correct / test_total
    print(f"\nTest Accuracy: {accuracy:.2f}%")
    print("Confusion Matrix:")
    print(confusion_matrix(y_true, y_pred))

    # 추가 분석 코드
    feature_names = ['Danceability', 'Energy', 'Loudness', 'Speechiness',
                     'Acousticness', 'Liveness', 'Tempo', 'Duration (ms)']

    misclassified = analyze_misclassifications(model, test_loader, feature_names, scaler)

    print("\n=== 오분류 패턴 분석 ===")
    for category in range(3):
        wrong_predictions = [s for s in misclassified if s['actual'] == category]
        if wrong_predictions:
            print(f"\n클래스 {category}가 잘못 예측된 경우:")
            for feature in feature_names:
                values = [s['features'][feature] for s in wrong_predictions]
                print(f"{feature}: 평균={np.mean(values):.2f}, 표준편차={np.std(values):.2f}")

    print("\n=== 예측 신뢰도 분석 ===")
    for category in range(3):
        wrong_predictions = [s for s in misclassified if s['actual'] == category]
        if wrong_predictions:
            avg_confidence = np.mean([s['confidence'] for s in wrong_predictions])
            print(f"클래스 {category} 오분류 평균 신뢰도: {avg_confidence:.2f}")

    feature_importance = analyze_feature_importance(model, test_loader, feature_names)

    print("\n=== 특성 중요도 분석 ===")
    for feature, importance in sorted(feature_importance.items(), key=lambda x: x[1], reverse=True):
        print(f"{feature}: {importance:.4f}")

if __name__ == "__main__":
    main()

Final category counts after SMOTE: [1377 1377 1377]
Epoch [1/150], Train Loss: 1.8566, Val Loss: 1.8123
Epoch [2/150], Train Loss: 1.8122, Val Loss: 1.7816
Epoch [3/150], Train Loss: 1.7911, Val Loss: 1.7619
Epoch [4/150], Train Loss: 1.7720, Val Loss: 1.7460
Epoch [5/150], Train Loss: 1.7560, Val Loss: 1.7330
Epoch [6/150], Train Loss: 1.7347, Val Loss: 1.7264
Epoch [7/150], Train Loss: 1.7212, Val Loss: 1.7184
Epoch [8/150], Train Loss: 1.6997, Val Loss: 1.7083
Epoch [9/150], Train Loss: 1.6906, Val Loss: 1.7030
Epoch [10/150], Train Loss: 1.6838, Val Loss: 1.6881
Epoch [11/150], Train Loss: 1.6712, Val Loss: 1.6815
Epoch [12/150], Train Loss: 1.6622, Val Loss: 1.6788
Epoch [13/150], Train Loss: 1.6536, Val Loss: 1.6817
Epoch [14/150], Train Loss: 1.6393, Val Loss: 1.6692
Epoch [15/150], Train Loss: 1.6390, Val Loss: 1.6702
Epoch [16/150], Train Loss: 1.6265, Val Loss: 1.6516
Epoch [17/150], Train Loss: 1.6161, Val Loss: 1.6489
Epoch [18/150], Train Loss: 1.6073, Val Loss: 1.6481
Epo

  model.load_state_dict(torch.load('best_model.pth'))
