In [1]:
from sklearn.preprocessing import StandardScaler, MinMaxScaler
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score
from sklearn.datasets import make_moons

RecursionError: maximum recursion depth exceeded

In [2]:
np.random.seed(12)

In [3]:
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
from scipy.interpolate import make_smoothing_spline

class SMPA(BaseEstimator, ClassifierMixin):
    def __init__(self, learning_rate=0.005, epochs=50, random_state=7, verbose=False,
                 lambda_scaling='log', patience=5, decay_factor=0.5, min_learning_rate=1e-6,
                 n_control_points=5, smoothing_lambda=0.1):
        self.learning_rate = learning_rate
        self.initial_learning_rate = learning_rate
        self.epochs = epochs
        self.random_state = random_state
        self.verbose = verbose
        self.lambda_scaling = lambda_scaling
        self.patience = patience
        self.decay_factor = decay_factor
        self.min_learning_rate = min_learning_rate
        self.n_control_points = n_control_points
        self.smoothing_lambda = smoothing_lambda
        self.error_history_ = []
        self.learning_rate_history_ = []
        if lambda_scaling not in ['log', 'sqrt', 'none']:
            raise ValueError("lambda_scaling must be one of 'log', 'sqrt', or 'none'")
        np.random.seed(random_state)

    def _calculate_means(self, X, y):
        mask_1 = y == 1
        self.m1 = X[mask_1].mean(axis=0)
        self.m0 = X[~mask_1].mean(axis=0)

    def _initialize_control_points(self, X):
        x_min, x_max = X[:, 0].min(), X[:, 0].max()
        y_min, y_max = X[:, 1].min(), X[:, 1].max()
        self.control_x = np.linspace(x_min, x_max, self.n_control_points).copy()  # Make it adjustable
        y_mid = (self.m0[1] + self.m1[1]) / 2
        self.control_y = np.random.uniform(y_mid - (y_max - y_min) * 0.1,
                                          y_mid + (y_max - y_min) * 0.1,
                                          self.n_control_points)
        self.initial_control_x = self.control_x.copy()  # Store initial x too
        self.initial_control_y = self.control_y.copy()

    def _fit_spline(self):
        self.spline = make_smoothing_spline(self.control_x, self.control_y, lam=self.smoothing_lambda)

    def _calculate_displacement(self, X):
        spline_y = self.spline(X[:, 0])
        return X[:, 1] - spline_y

    def _update_pseudo_labels(self, X, y):
        m1_displacement = self._calculate_displacement(self.m1.reshape(1, -1))[0]
        self.class_1_pseudo = 1 if m1_displacement > 0 else -1
        self.class_0_pseudo = -self.class_1_pseudo
        return np.where(y == 1, self.class_1_pseudo, self.class_0_pseudo)

    def fit(self, X, y):
        self.classes_ = np.unique(y)
        if not set(self.classes_).issubset({0, 1}):
            raise ValueError("Labels must be 0 and 1")
        if X.shape[1] != 2:
            raise ValueError("This is a 2D-only algorithm for now!")
        X = np.asarray(X)
        y = np.asarray(y)

        self._calculate_means(X, y)
        self._initialize_control_points(X)
        self._fit_spline()

        best_error = float('inf')
        best_control_x = None
        best_control_y = None
        best_class_1_pseudo = None
        patience_counter = 0
        current_learning_rate = self.initial_learning_rate

        self.error_history_ = []
        self.learning_rate_history_ = []

        indices_class_0 = np.where(y == 0)[0]
        indices_class_1 = np.where(y == 1)[0]

        for epoch in range(self.epochs):
            self._fit_spline()
            pseudo_labels = self._update_pseudo_labels(X, y)
            displacements = self._calculate_displacement(X)
            errors = (displacements * pseudo_labels <= 0)
            error_count = np.sum(errors)

            self.error_history_.append(error_count)
            self.learning_rate_history_.append(current_learning_rate)

            if self.verbose and epoch % 10 == 0:
                print(f"Epoch {epoch}: Errors = {error_count}, LR = {current_learning_rate:.6f}")

            if error_count < best_error:
                best_error = error_count
                best_control_x = self.control_x.copy()
                best_control_y = self.control_y.copy()
                best_class_1_pseudo = self.class_1_pseudo
                patience_counter = 0
            else:
                patience_counter += 1
                if patience_counter >= self.patience:
                    current_learning_rate = max(current_learning_rate * self.decay_factor, self.min_learning_rate)
                    patience_counter = 0
                    if current_learning_rate == self.min_learning_rate:
                        if self.verbose:
                            print(f"Min LR reached at epoch {epoch}")
                        break

            if errors.any():
                error_indices = np.where(errors)[0]
                for idx in error_indices:
                    d = X[idx]
                    distances = np.abs(self.control_x - d[0])
                    nearest_idx = np.argmin(distances)
                    distance = distances[nearest_idx]
                    lmbda = (np.log1p(distance) if self.lambda_scaling == 'log' else
                            np.sqrt(distance) if self.lambda_scaling == 'sqrt' else distance)

                    # Determine opposite class's correctly classified points
                    if y[idx] == 1:  # Misclassified class 1 point
                        opp_indices = indices_class_0
                    else:  # Misclassified class 0 point
                        opp_indices = indices_class_1

                    # Find correctly classified points in opposite class
                    opp_displacements = displacements[opp_indices]
                    opp_labels = pseudo_labels[opp_indices]
                    correct_opp = opp_indices[opp_displacements * opp_labels > 0]

                    if len(correct_opp) > 0:
                        # Pick a random subset of correctly classified opposite class points
                        n_random = max(1, len(correct_opp) // 2)  # Take half, at least 1
                        random_correct = np.random.choice(correct_opp, size=n_random, replace=False)
                        # Average 2D position of the random subset
                        random_avg_opp = np.mean(X[random_correct], axis=0)  # [x, y]
                        # Calculate 2D difference to move toward this random avg
                        delta_x = random_avg_opp[0] - self.control_x[nearest_idx]
                        delta_y = random_avg_opp[1] - self.control_y[nearest_idx]
                        # Apply step in both directions
                        step_x = delta_x * lmbda * current_learning_rate
                        step_y = delta_y * lmbda * current_learning_rate

                        # Constrain step_x to maintain ascending order
                        if nearest_idx > 0:
                            min_x = self.control_x[nearest_idx - 1] + 1e-6  # Small buffer to avoid equality
                            step_x = max(step_x, min_x - self.control_x[nearest_idx])
                        if nearest_idx < len(self.control_x) - 1:
                            max_x = self.control_x[nearest_idx + 1] - 1e-6
                            step_x = min(step_x, max_x - self.control_x[nearest_idx])
                    else:
                        # Fallback: Use original direction (y-only) if no correct points
                        step_x = 0  # No x movement in fallback
                        step_y = -pseudo_labels[idx] * lmbda * current_learning_rate

                    self.control_x[nearest_idx] += step_x
                    self.control_y[nearest_idx] += step_y

        # Store the last control points from the final epoch
        self.last_control_x = self.control_x.copy()
        self.last_control_y = self.control_y.copy()
        # Restore best for prediction
        self.control_x = best_control_x
        self.control_y = best_control_y
        self._fit_spline()
        self.class_1_pseudo = best_class_1_pseudo
        return self

    def predict(self, X):
        X = np.asarray(X)
        displacements = self._calculate_displacement(X)
        return np.where(displacements > 0,
                        1 if self.class_1_pseudo > 0 else 0,
                        0 if self.class_1_pseudo > 0 else 1)

    def plot_convergence(self, figsize=(12, 5)):
        try:
            import matplotlib.pyplot as plt
            fig, (ax1, ax2) = plt.subplots(1, 2, figsize=figsize)
            ax1.plot(self.error_history_, 'b-', label='Errors')
            ax1.set_title('Error Convergence')
            ax1.set_xlabel('Epoch')
            ax1.set_ylabel('Number of Errors')
            ax1.grid(True)
            ax2.plot(self.learning_rate_history_, 'r-', label='Learning Rate')
            ax2.set_title('Learning Rate Decay')
            ax2.set_xlabel('Epoch')
            ax2.set_ylabel('Learning Rate')
            ax2.set_yscale('log')
            ax2.grid(True)
            plt.tight_layout()
            return fig
        except ImportError:
            print("Install matplotlib with 'pip install matplotlib'!")
            return None

    def plot_boundary(self, X, y, figsize=(8, 6)):
        try:
            import matplotlib.pyplot as plt
            fig = plt.figure(figsize=figsize)
            plt.scatter(X[:, 0], X[:, 1], c=y, cmap='bwr', alpha=0.5)

            # Plot initial spline
            initial_spline = make_smoothing_spline(self.initial_control_x, self.initial_control_y, lam=self.smoothing_lambda)
            x_range = np.linspace(X[:, 0].min(), 60, 100)                                                                     ## Need to hardcode x range because otherwise spline shoots up
            y_initial = initial_spline(x_range)                                                                               ## and we don't need such high y values
            plt.plot(x_range, y_initial, 'r--', label='Initial Boundary', alpha=0.7)

            # Plot last spline (from final epoch)
            last_spline = make_smoothing_spline(self.last_control_x, self.last_control_y, lam=self.smoothing_lambda)
            y_last = last_spline(x_range)
            plt.plot(x_range, y_last, 'g-', label='Last Boundary')

            # Plot control points
            plt.scatter(self.initial_control_x, self.initial_control_y, c='orange', marker='o', label='Initial Control Points', alpha=0.7)
            plt.scatter(self.last_control_x, self.last_control_y, c='k', marker='x', label='Last Control Points')

            plt.legend()
            plt.title('MPA2D_Spline Decision Boundary: Initial vs Last')
            return fig
        except ImportError:
            print("Install matplotlib with 'pip install matplotlib'!")
            return None

    def score(self, X, y):
        """Compute accuracy of predictions using sklearn's standard method."""
        return accuracy_score(y, self.predict(X))

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats

from sklearn.model_selection import train_test_split, GridSearchCV, cross_val_score, StratifiedKFold
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.utils import shuffle

# Your Spiral Dataset Generator
def generate_spiral_dataset(n_points=400, noise=0.2):
    X, y = make_moons(n_samples=300, random_state = 7, noise = 0.3)
    return X, y

# Comprehensive Classifier Comparison
class ClassifierComparison:
    def __init__(self, X, y, random_state=42):
        self.X = X
        self.y = y
        self.random_state = random_state
        self.smpa_grid = {'n_control_points': [5, 6, 7, 8, 9, 10],
                         'learning_rate': [0.1, 0.2, 0.5, 1.0, 2.0],
                         'epochs': [20, 50, 100, 200, 400]}
        self.svm_grid = {'C': [0.1, 1, 10, 50, 100],
                        'gamma': ['scale', 'auto', 0.1, 0.01],
                        'kernel': ['rbf']}

    def scale_data(self, X, scaling_method='smpa'):
        if scaling_method == 'smpa':
            scaler = MinMaxScaler(feature_range=(-100, 100))
        else:
            scaler = StandardScaler()
        return scaler.fit_transform(X)

    def grid_search(self, classifier_type='smpa'):
        X_scaled = self.scale_data(self.X, 'smpa' if classifier_type == 'smpa' else 'svm')

        if classifier_type == 'smpa':
            clf = SMPA(random_state=self.random_state)
            param_grid = self.smpa_grid
        else:
            clf = SVC(random_state=self.random_state)
            param_grid = self.svm_grid

        cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=self.random_state)
        grid_search = GridSearchCV(clf, param_grid, cv=cv, scoring='accuracy', n_jobs=-1, verbose=1)
        grid_search.fit(X_scaled, self.y)
        return grid_search

    def stability_test(self, classifier_type='smpa', n_runs=50):
        scores = []

        for seed in range(n_runs):
            np.random.seed(seed)
            X_shuffled, y_shuffled = shuffle(self.X, self.y, random_state=seed)

            X_train, X_test, y_train, y_test = train_test_split(
                X_shuffled, y_shuffled,
                test_size=0.2,
                random_state=seed,
                stratify=y_shuffled
            )

            # Scale train and test separately
            X_train_scaled = self.scale_data(X_train, 'smpa' if classifier_type == 'smpa' else 'svm')
            X_test_scaled = self.scale_data(X_test, 'smpa' if classifier_type == 'smpa' else 'svm')

            print(f"Run {seed}: Train Class Distribution - {np.bincount(y_train)}")
            print(f"Run {seed}: Test Class Distribution - {np.bincount(y_test)}")

            if classifier_type == 'smpa':
                clf = SMPA(random_state=seed, **smpa_grid_search.best_params_)
            else:
                clf = SVC(random_state=seed, **svm_grid_search.best_params_)


            try:
                clf.fit(X_train_scaled, y_train)
                y_pred = clf.predict(X_test_scaled)
                scores.append(accuracy_score(y_test, y_pred))
            except Exception as e:
                print(f"🚨 Error in run {seed}: {e}")
                print(f"Train data shape: {X_train_scaled.shape}")
                print(f"Train labels shape: {y_train.shape}")

        return {
            'mean_score': np.mean(scores),
            'std_score': np.std(scores),
            'scores': scores
        }

    def statistical_significance_test(self, smpa_scores, svm_scores):
        t_statistic, p_value = stats.ttest_ind(smpa_scores, svm_scores)

        print("\n🔬 Statistical Significance Test Results:")
        print(f"T-Statistic: {t_statistic}")
        print(f"P-Value: {p_value}")

        alpha = 0.05
        if p_value < alpha:
            print("🏆 There's a statistically significant difference between classifiers!")
        else:
            print("🤝 No statistically significant difference detected.")

        return t_statistic, p_value

# Main Execution
np.random.seed(42)
X, y = generate_spiral_dataset(n_points=400, noise=0.2)
comparison = ClassifierComparison(X, y)

# Grid Search
smpa_grid_search = comparison.grid_search('smpa')
svm_grid_search = comparison.grid_search('svm')

print("\n🚀 SMPA Best Parameters:", smpa_grid_search.best_params_)
print("🚀 SVM Best Parameters:", svm_grid_search.best_params_)

# Stability Test
smpa_stability = comparison.stability_test('smpa', n_runs=50)  # Reduced runs for faster testing
svm_stability = comparison.stability_test('svm', n_runs=50)

print("\n📊 SMPA Stability:")
print(f"Mean Score: {smpa_stability['mean_score']:.4f}")
print(f"Score Std Dev: {smpa_stability['std_score']:.4f}")

print("\n📊 SVM Stability:")
print(f"Mean Score: {svm_stability['mean_score']:.4f}")
print(f"Score Std Dev: {svm_stability['std_score']:.4f}")

# Statistical Significance
comparison.statistical_significance_test(
    smpa_stability['scores'],
    svm_stability['scores']
)