In [1]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from pymoo.util.nds.non_dominated_sorting import NonDominatedSorting
from ace_tools_open import display_dataframe_to_user

In [2]:
# ---------------------- Configuration Parameters ----------------------
FILE_PATH = r'D:\RS\data_clustering\PCAF_Loadings_2022.csv'  # Input file path
INDICATOR_INFO = {  # Direction information dictionary
    "PCA-L1": "positive", "PCA-L2": "positive", "PCA-L3": "positive",
    "PCA-L4": "positive", "PCA-L5": "positive", "PCA-L6": "positive",
    "PCA-L7": "positive", "PCA-L8": "negative", "PCA-L9": "negative",
    "PCA-L10": "negative", "PCA-L11": "negative", "PCA-L12": "positive",
    "PCA-L13": "positive", "PCA-L14": "positive", "PCA-L15": "positive",
    "PCA-L16": "positive", "PCA-L17": "negative", "PCA-L18": "negative"
}
NEW_PC_ORDER = ["PCA-L1", "PCA-L2", "PCA-L4", "PCA-L5"]  # Custom PC column order
SELECTED_DIMS = [0, 1, 2, 3]  # Selected dimension indices (0-based)
MIN_DIMS = 2  # Minimum number of dimensions to attempt

In [3]:
# ---------------------- Core Functions ----------------------
def load_and_process_data(file_path):
    """Load and process data"""
    df = pd.read_csv(file_path)
    pc_cols = [col for col in df.columns if col.startswith('PCA-L')]
    df_pc = df[['COUNTRY'] + pc_cols].sort_values('COUNTRY').reset_index(drop=True)
    return df_pc, pc_cols


def reorder_columns(df, pc_cols, new_order):
    """Reorder PC columns according to new order"""
    valid_cols = [col for col in new_order if col in pc_cols]
    return df[['COUNTRY'] + valid_cols], valid_cols


def prepare_optimization_data(df, pc_cols, directions_dict, selected_dims):
    """Prepare data for optimization (standardization + direction conversion)"""
    selected_cols = [pc_cols[i] for i in selected_dims]
    directions = [directions_dict[col] for col in selected_cols]

    X = df[selected_cols].values.astype(float)
    X_std = StandardScaler().fit_transform(X)

    # Direction vector: positive→1, negative→-1
    direction_vec = np.array([1 if d == 'positive' else -1 for d in directions])

    print("\nSelected dimensions and their directions:")
    for i, (col, dir) in enumerate(zip(selected_cols, directions)):
        print(f"Dimension {i + 1}: {col} - {dir}")

    return X_std, direction_vec, selected_cols, selected_dims  # Return dimension indices


def find_pareto_frontiers(X_std, direction_vec, selected_dims, min_dims=MIN_DIMS):
    """Find Pareto frontiers (original logic: dynamically attempt dimension combinations)"""
    total_dims = X_std.shape[1]
    used_dims_indices = []  # Record indices of actually used dimensions

    # Attempt from maximum dimensions down to minimum
    for n in range(total_dims, min_dims - 1, -1):
        Z = X_std[:, :n]
        signs = direction_vec[:n]
        F = Z * signs[np.newaxis, :]  # Convert to minimization problem

        nds = NonDominatedSorting()
        fronts_list = nds.do(F, only_non_dominated_front=False)
        fronts = np.zeros(Z.shape[0], dtype=int)

        for rank, indices in enumerate(fronts_list, 1):
            fronts[indices] = rank

        if len(np.unique(fronts)) > 1:
            used_dims_indices = selected_dims[:n]  # Get indices of used dimensions
            print(
                f"Found {len(np.unique(fronts))} Pareto fronts using {n} dimensions: {[i + 1 for i in used_dims_indices]}")
            return fronts, used_dims_indices

    # Fallback to minimum dimensions (MIN_DIMS=2)
    Z = X_std[:, :min_dims]
    signs = direction_vec[:min_dims]
    F = Z * signs[np.newaxis, :]
    fronts_list = nds.do(F, only_non_dominated_front=False)
    fronts = np.zeros(Z.shape[0], dtype=int)

    for rank, indices in enumerate(fronts_list, 1):
        fronts[indices] = rank

    used_dims_indices = selected_dims[:min_dims]
    print(f"Fallback: Only 1 front found, using first {min_dims} dimensions: {[i + 1 for i in used_dims_indices]}")
    return fronts, used_dims_indices


def print_frontier_countries(results):
    """Print country names grouped by Pareto front levels"""
    print("\n\n===== Countries Grouped by Pareto Front Levels =====")
    # Get unique front levels and sort them
    unique_fronts = sorted(results['ParetoFront'].unique())

    for front in unique_fronts:
        # Filter countries in the current front level
        countries = results[results['ParetoFront'] == front]['COUNTRY'].tolist()
        # Sort country names alphabetically
        countries.sort()
        # Print results
        print(f"\nFront {front} ({len(countries)} countries):")
        print(", ".join(countries))


In [None]:
# 1. Load and sort data
df_pc, all_pc_cols = load_and_process_data(FILE_PATH)
print(f"\nOriginal data: {df_pc.shape[0]} countries, {len(all_pc_cols)} PC columns")

# 2. Reorder PC columns
df_reordered, pc_cols_reordered = reorder_columns(df_pc, all_pc_cols, NEW_PC_ORDER)
print(f"\nReordered PC columns: {', '.join(pc_cols_reordered)}")

# 3. Prepare optimization data (including dimension indices)
X_std, dir_vec, used_cols, selected_dims = prepare_optimization_data(
    df_reordered, pc_cols_reordered, INDICATOR_INFO, SELECTED_DIMS
)

# 4. Find Pareto frontiers (dynamic dimension attempt)
front_labels, final_used_dims = find_pareto_frontiers(
    X_std, dir_vec, selected_dims, min_dims=MIN_DIMS
)

# 5. Merge and display results
results = df_reordered.copy()
results['ParetoFront'] = front_labels
results['UsedDimensions'] = f"PC{[i + 1 for i in final_used_dims]}"  # Record used dimensions

print("\nNumber of countries in each Pareto front (sorted by level):")
# Get unique front levels and sort them
sorted_fronts = sorted(results['ParetoFront'].unique())
for front in sorted_fronts:
    count = results[results['ParetoFront'] == front].shape[0]
    print(f"Front {front}: {count} countries")

# 6. Print country lists by front level
print_frontier_countries(results)

print("\nResult preview:")
try:
    display_dataframe_to_user(
        "Multi-Objective Optimization Results",
        results[['COUNTRY'] + pc_cols_reordered + ['ParetoFront', 'UsedDimensions']]
    )
except Exception as e:
    print(f"Display error: {e}")
    print(results.head()[['COUNTRY'] + pc_cols_reordered + ['ParetoFront', 'UsedDimensions']])