# 一、作业要求

基本要求：
● 利用numpy实现Fisher判别
● 对比sklearn的结果（应该完全一致）
● 数据要求：与神经网络一致


In [6]:
import random
import numpy as np
from numpy.random import randn
from numpy.linalg import pinv, eig
from scipy.linalg import orth
from sklearn.mixture import GaussianMixture
from sklearn.linear_model import LinearRegression


class MyLinearDiscriminantAnalysis(object):
    def __init__(self, n_components=None, n_classes=None,
                 predict_reduction=True, eps=0.001, random_state=None,
                 covariance_type='spherical'):
        self.n_components = n_components
        self.n_classes = n_classes
        self.predict_reduction = predict_reduction
        self.eps = eps
        self.random_state = random_state
        self.covariance_type = covariance_type
        self.mu_ = None
        self.mu_c_ = None
        self.S_within_ = None
        self.S_between_ = None
        self.W_ = None
        self.W_inverse_ = None
        self.eig_pairs_ = None
        self.predictor_ = None
        self.clusters_ = None
        self.dimensionality_ = None
        # Random seed:
        np.random.seed(random_state)
        random.seed(random_state)

    def fit(self, X, y=None, n_clusters=None, min_clusters=2, max_clusters=40,class_clustering=True, verbose=False):
        n_samples = X.shape[0]
        n_features = X.shape[1]

        assert 0 < min_clusters <= max_clusters < n_samples * n_features
        assert 0 <= self.eps

        if y is None:
            self.clusters_ = self._clustering(X=X, min_clusters=min_clusters,
                                              max_clusters=max_clusters,
                                              n_clusters=n_clusters,
                                              verbose=verbose)
        elif class_clustering:
            self.clusters_ = - np.ones(y.shape, dtype=int)
            for target in np.unique(y):
                y_ = self._clustering(X=X[y == target], y=target,
                                      min_clusters=min_clusters,
                                      max_clusters=max_clusters,
                                      n_clusters=n_clusters,
                                      verbose=verbose)
                y_ += np.max(self.clusters_) + 1
                self.clusters_[y == target] = y_

            if verbose:
                print("Targets were provided: using the labeled data and "
                      "intra-class clustering.\n")
        else:
            self.clusters_ = y
            if verbose:
                print("Targets were provided: using the labeled data.\n")

        self.mu_, self.mu_c_ = self._means(X=X, y=self.clusters_,
                                           verbose=verbose)

        N_c, self.S_within_ = self._scatter_within(X=X, y=self.clusters_,
                                                   mu_c=self.mu_c_,
                                                   verbose=verbose)

        self.S_between_ = self._scatter_between(mu=self.mu_, mu_c=self.mu_c_,
                                                N_c=N_c, verbose=verbose)

        self.eig_pairs_ = self._eig_pairs(S_within=self.S_within_,
                                          S_between=self.S_between_)

        P = self._filter_eig_pairs(eig_pairs=self.eig_pairs_, eps=self.eps,
                                   verbose=verbose)

        self.W_ = self._fill_rank(P=P, verbose=verbose)

    def transform(self, X):
        X_new = X @ self.W_

        if self.n_components is not None:
            n_features = X.shape[1]
            assert 0 < self.n_components < n_features

            if self.predict_reduction:
                self.predictor_ = LinearRegression()
                self.predictor_.fit(X_new[:, :self.n_components], X)
            X_new = X_new[:, :self.n_components]

        return X_new

    def fit_transform(self, X, y=None, n_clusters=None,min_clusters=2, max_clusters=40,class_clustering=True,verbose=False):
        self.fit(X=X, y=y, n_clusters=n_clusters,
                 min_clusters=min_clusters,
                 max_clusters=max_clusters,
                 class_clustering=class_clustering,
                 verbose=verbose)
        return self.transform(X)

    def inverse_transform(self, X, predict_reduction=True, verbose=False):
        n_samples = X.shape[0]
        n_features = X.shape[1]

        if self.W_inverse_ is None:
            self.W_inverse_ = pinv(self.W_)

        if n_features != self.W_.shape[1]:
            assert n_features == self.n_components

            if verbose:
                print(f"Reverse tranformation after dimensionality reduction "
                      f"may yield unexpected results: "
                      f"{n_features} dim. expanded to {self.W_.shape[1]} dim.")

            if self.predict_reduction and predict_reduction:
                return self.predictor_.predict(X)
            else:
                mean = self.mu_.reshape(1, -1) @ self.W_
                X_ = np.repeat(mean.reshape(1, -1), n_samples, 0)
                X_[:, :n_features] = X[:, :n_features]

            X = X_

        return X @ self.W_inverse_

    def _clustering(self, X, min_clusters, max_clusters, n_clusters=None,y=None, verbose=False):
        n_samples = X.shape[0]
        n_features = X.shape[1]

        if verbose:
            print("No target is provided: using unsupervised clustering.\n")

        if self.n_classes is not None and y is None:
            assert 0 < self.n_classes < n_samples * n_features

            if verbose:
                print(f"Using the provided number of classes "
                      f"({self.n_classes}) for unsupervised clustering.\n")
            model = GaussianMixture(self.n_classes, covariance_type='full',
                                    random_state=self.random_state).fit(X)
        else:
            if verbose:
                if n_clusters is not None:
                    min_clusters = max_clusters = n_clusters
                    print(f"Using the provided number of clusters "
                          f"({n_clusters}) for unsupervised clustering.\n")
                elif y is not None:
                    print(f"Searching for an optimal number of clusters within"
                          f" class {y} for values between {min_clusters} and "
                          f"{max_clusters}...\n")
                else:
                    print("Predicting the classes from the clusters...\n")
                    print(f"Searching for an optimal number of clusters "
                          f"between {min_clusters} and {max_clusters}...\n")

            range_n = np.arange(min_clusters, max_clusters + 1)
            models = []
            for n in range_n:
                gmm = GaussianMixture(n, covariance_type=self.covariance_type,
                                      random_state=self.random_state).fit(X)
                models.append(gmm)
                if verbose:
                    print(f"{n} clusters: "
                          f"\tAIC: {gmm.aic(X):.3f}, "
                          f"\tBIC: {gmm.bic(X):.3f}  ")
            index = np.argmin(np.array([m.aic(X) for m in models]))
            model = models[index]
            self.n_classes = index + min_clusters
            if verbose and n_clusters is None:
                a = f" for class {y}" if y is not None else ""
                print(f"Optimal number of clusters{a} found: "
                      f"{self.n_classes}\n")

        return model.predict(X)

    @staticmethod
    def _means(X, y, verbose=False):
        n_features = X.shape[1]
        n_classes = len(np.unique(y))

        mu = np.mean(X, axis=0).reshape(1, -1)
        if verbose:
            print(f"Mu:\n{mu}\n")

        mu_c = np.zeros((n_classes, n_features))
        for i, target in enumerate(np.unique(y)):
            mu_c[i] = np.mean(X[y == target], axis=0)
            if verbose:
                print(f"Mu_c[{i}]:\n{mu_c[i]}\n")

        return mu, mu_c

    @staticmethod
    def _scatter_within(X, y, mu_c, verbose=False):
        n_classes = len(np.unique(y))

        data = []
        N_c = np.zeros(n_classes)
        for i, target in enumerate(np.unique(y)):
            delta = X[y == target] - mu_c[i]
            data.append(delta.T @ delta)
            N_c[i] = np.sum(y == target)

        S_within = np.sum(data, axis=0)
        if verbose:
            print(f"S_intra:\n{S_within}\n")

        return N_c, S_within

    @staticmethod
    def _scatter_between(mu, mu_c, N_c, verbose=False):
        delta = np.array(mu_c - mu)
        S_between = N_c * delta.T @ delta
        if verbose:
            print(f"S_inter:\n{S_between}\n")

        return S_between

    @staticmethod
    def _eig_pairs(S_within, S_between):
        A = pinv(S_within) @ S_between
        eig_val, eig_vec = eig(A)
        eig_val = np.abs(eig_val)

        return sorted(zip(eig_val, eig_vec.T), key=lambda k: k[0],
                      reverse=True)

    def _filter_eig_pairs(self, eig_pairs, eps, verbose=False):
        eig_vals, eig_vecs = zip(*eig_pairs)
        total = sum(eig_vals)
        eigenvectors = []

        if verbose:
            print("Singular values:")
        for i, v in enumerate(eig_pairs):
            if v[0] / total >= eps / 100:
                verdict = 'Accepted'
                eigenvectors.append(v[1])
            else:
                verdict = 'Rejected'
            if verbose:
                percentage = v[0] / total
                eigenvalue = v[0]
                print(f"Singular value {i + 1:}: "
                      f"\t{percentage:<8.2%} "
                      f"\t{eigenvalue:<8.6f} \t {verdict}")
        if verbose:
            print()

        self.dimensionality_ = len(eigenvectors)

        return np.vstack(eigenvectors).T.real

    @staticmethod
    def _fill_rank(P, verbose=False):
        n_features = P.shape[0]

        while True:
            W = randn(n_features, n_features)
            W = orth(W)
            W[:P.shape[0], :P.shape[1]] = P
            if orth(W).shape == (n_features, n_features):
                break

        if verbose:
            print(f"W:\n{W}\n")

        return W

In [7]:
from sklearn.datasets import make_classification
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
X,Y = make_classification(n_classes=2)
mylda = MyLinearDiscriminantAnalysis()
sklda = LinearDiscriminantAnalysis()
mylda.fit(X,Y)
sklda.fit(X,Y)
print(sklda.means_)
print(mylda._means(X,Y))
print(mylda.transform(X[1]))
print(sklda.transform(X[1].reshape(-1,1)))

[[ 2.62208741e-01 -1.10986194e-01  6.43303975e-02  1.83907311e-01
  -1.53005051e-01 -1.57368237e-01  4.01472835e-01  6.93121652e-02
  -6.52883630e-01 -4.78158438e-02 -1.08525509e-03 -2.29735242e-02
  -1.20552058e+00 -3.42389223e-02 -1.54300807e-01 -7.77544587e-02
   5.88824132e-02  1.14091928e-01  2.63931747e-01  5.70821244e-02]
 [ 1.03443923e-01 -2.00860438e-02  3.38658371e-01 -1.76872404e-01
  -2.11760508e-01  4.75502912e-03 -3.85601175e-01 -4.91112340e-02
   5.05596923e-01  9.47820013e-02  2.53815082e-01 -2.42088682e-01
   8.21704333e-01  1.39189183e-01  1.94155797e-02  1.64083752e-01
  -4.78974252e-02 -4.14699437e-02 -1.21318579e-01  1.79793647e-01]]
(array([[ 0.18441398, -0.06644512,  0.1987511 ,  0.00712525, -0.18179523,
        -0.07792784,  0.01580657,  0.0112847 , -0.08522816,  0.0220571 ,
         0.12381591, -0.13033995, -0.21218037,  0.05074085, -0.06917978,
         0.04074626,  0.00656029,  0.03786661,  0.07515909,  0.11721077]]), array([[ 2.62208741e-01, -1.10986194e-01,

