In [1]:
import numpy as np

import matplotlib.pyplot as plt

In [2]:
import numpy as np
import scipy
import numbers
from scipy.sparse.linalg import svds
import warnings


class PCA:

    def __init__(self, n_components=None,svd_solver='auto'):
        # assert n_components >= 1, "n_components must be valid"

        self.n_components = n_components
        self.n_components_ = None
        self.svd_solver = svd_solver

    def svd_flip(self,u,v,u_based_decision=True):
        if u_based_decision:
            # columns of u, rows of v
            max_abs_cols = np.argmax(np.abs(u), axis=0)
            signs = np.sign(u[max_abs_cols, range(u.shape[1])])
            u *= signs
            v *= signs[:, np.newaxis]
        else:
            # rows of v, columns of u
            max_abs_rows = np.argmax(np.abs(v), axis=1)
            signs = np.sign(v[range(v.shape[0]), max_abs_rows])
            u *= signs
            v *= signs[:, np.newaxis]
        return u, v

    def stable_cumsum(self,arr,axis=None,rtol=1e-05,atol=1e-8):
        # if np_version < (1, 9):
        #     return np.cumsum(arr, axis=axis, dtype=np.float64)

        out = np.cumsum(arr, axis=axis, dtype=np.float64)
        expected = np.sum(arr, axis=axis, dtype=np.float64)
        if not np.all(np.isclose(out.take(-1, axis=axis), expected, rtol=rtol,
                                 atol=atol, equal_nan=True)):
            warnings.warn('cumsum was found to be unstable: '
                          'its last element does not correspond to sum',
                          RuntimeWarning)
        return out

    def fit(self,X,y=None):

        self._fit(X)
        return self

    def fit_transform(self, X, y=None):

        U, S, V = self._fit(X)
        U = U[:,:self.n_components_]

        return U

    def _fit(self,X):
        if self.n_components is None:
            if self.svd_solver != 'arpack':
                n_components = min(X.shape)
            else:
                n_components = min(X.shape) - 1
        else:
            n_components = self.n_components

        svd_solver = self.svd_solver

        if svd_solver == 'auto':
            # Small problem or n_components == 'mle', just call full PCA
            if max(X.shape) <= 500 :
                svd_solver = 'full'
            elif n_components >= 1 and n_components < .8 * min(X.shape):
                svd_solver = 'randomized'
            # This is also the case of n_components in (0,1)
            else:
                svd_solver = 'full'

        # Call different fits for either full or truncated SVD
        if svd_solver == 'full':
            return self._fit_full(X, n_components)
        elif svd_solver in ['arpack', 'randomized']:
            return self._fit_truncated(X, n_components, svd_solver)
        else:
            raise ValueError("Unrecognized svd_solver='{0}'"
                             "".format(svd_solver))

    def _fit_full(self,X,n_components):
        n_samples, n_features = X.shape


        if not 0 <= n_components <= min(n_samples, n_features):
            raise ValueError("n_components=%r must be between 0 and "
                             "min(n_samples, n_features)=%r with "
                             "svd_solver='full'"
                             % (n_components, min(n_samples, n_features)))
        elif n_components >= 1:
            if not isinstance(n_components, (numbers.Integral, np.integer)):
                raise ValueError("n_components=%r must be of type int "
                                 "when greater than or equal to 1, "
                                 "was of type=%r"
                                 % (n_components, type(n_components)))

        self.mean_ = np.mean(X,axis=0)
        X_demean = X - self.mean_

        U,S,V = np.linalg.svd(X_demean, full_matrices=False)
#         U,V = self.svd_flip(U,V)

        components_ = V

        explained_variance_ = (S**2)/(n_samples-1)
        total_var = explained_variance_.sum()
        explained_variance_ratio_ = explained_variance_/total_var
        singular_values_ = S.copy()

        if 0 < n_components < 1.0:
            # number of components for which the cumulated explained
            # variance percentage is superior to the desired threshold
            ratio_cumsum = self.stable_cumsum(explained_variance_ratio_)
            n_components = np.searchsorted(ratio_cumsum, n_components) + 1

            # Compute noise covariance using Probabilistic PCA model
            # The sigma2 maximum likelihood (cf. eq. 12.46)
        if n_components < min(n_features, n_samples):
            self.noise_variance_ = explained_variance_[n_components:].mean()
        else:
            self.noise_variance_ = 0.

        self.n_samples_, self.n_features_ = n_samples, n_features
        self.components_ = components_[:n_components]
        self.n_components_ = n_components
        self.explained_variance_ = explained_variance_[:n_components]
        self.explained_variance_ratio_ = \
            explained_variance_ratio_[:n_components]
        self.singular_values_ = singular_values_[:n_components]

        return U, S, V

    def _fit_truncated(self,X,n_components,svd_solver):
        n_samples, n_features = X.shape

        self.mean_ = np.mean(X, axis=0)
        X_demean = X - self.mean_

#         if svd_solver == 'arpack':
            # random init solution, as ARPACK does it internally
        v0 = np.random.uniform(-1, 1, size=min(X_demean.shape))
        U, S, V = svds(X_demean, k=n_components, tol=0.0, v0=v0)
            # svds doesn't abide by scipy.linalg.svd/randomized_svd
            # conventions, so reverse its outputs.
        S = S[::-1]
            # flip eigenvectors' sign to enforce deterministic output
#             U, V = self.svd_flip(U[:, ::-1], V[::-1])

        self.n_samples_, self.n_features_ = n_samples, n_features
        self.components_ = V
        self.n_components_ = n_components

        # Get variance explained by singular values
        self.explained_variance_ = (S ** 2) / (n_samples - 1)
        total_var = np.var(X_demean, ddof=1, axis=0)
        self.explained_variance_ratio_ = \
            self.explained_variance_ / total_var.sum()
        self.singular_values_ = S.copy()  # Store the singular values.

        if self.n_components_ < min(n_features, n_samples):
            self.noise_variance_ = (total_var.sum() -
                                    self.explained_variance_.sum())
            self.noise_variance_ /= min(n_features, n_samples) - n_components
        else:
            self.noise_variance_ = 0.

        return U, S, V

    def transform(self,X):
        if self.mean_ is not None:
            X_demean = X - self.mean_
        X_transformed = np.dot(X_demean, self.components_.T)
        return X_transformed

    def inverse_transform(self,X):
        return np.dot(X,self.components_)+self.mean_

In [5]:
%%time
from PIL import Image
data1 = []
for j in range(0, 10):
    for i in range(0,100):
        im = Image.open('C:\\Users\\Magneto_Wang\\Documents\\GitHub\\机器学习个人笔记\\机器学习实战\\PCA算法实现\\灰度图片\\%s.jpg' %(100*j+i) )
        mtr = np.array(im)
#         if mtr.shape[0] < mtr.shape[1]:
#             mtr = mtr.T
#         print(mtr.shape)
        
        s = mtr.reshape(1, 98304)
        data1.append(s)
     #   print(s.shape)
    
print('end')

end
Wall time: 4.2 s


In [6]:
data1_np = np.array(data1, dtype=int)
data1_np.shape

(1000, 1, 98304)

In [7]:
data_np=data1_np.reshape((1000,98304))
data_np.shape

(1000, 98304)

In [8]:
%%time
pca = PCA(80)
pca.fit(data_np)

Wall time: 1min 6s


In [9]:
data_reduction = pca.transform(data_np)
data_reduction.shape

(1000, 80)

In [10]:
from math import *
import numpy as np
def knn(X,k,number):
    distances = [sqrt(np.sum((x - X[number])**2)) for x in X]
    nearest = np.argsort(distances)
    return nearest[:k]

In [11]:
%%time
knnn = knn(data_reduction, 8, 729)

Wall time: 16 ms


In [12]:
knnn

array([729, 773, 797, 772, 186, 749, 978, 697], dtype=int64)

In [13]:
from collections import Counter
from math import *
import numpy as np
def knn(X,y,k,number):
    distances = [sqrt(np.sum((x - X[number])**2)) for x in X]
    nearest = np.argsort(distances)
    topK_y = [y[i] for i in nearest[1:k]]
    votes = Counter(topK_y)
    
    return nearest[1:k],votes.most_common(1)[0][0]

In [14]:
y0 = np.zeros(100,dtype=np.uint8)
y1 = np.ones(100,dtype=np.uint8)
y2 = y1+1
y3 = y2+1
y4 = y3+1
y5 = y4+1
y6 = y5+1
y7 = y6+1
y8 = y7+1
y9 = y8+1
y = np.concatenate((y0, y1,y2,y3,y4,y5,y6,y7,y8,y9),axis=0)

In [15]:
near,predict_y = knn(data_reduction,y,8,30)

In [16]:
predict_y
near

array([147, 795, 819, 785, 718, 151, 214], dtype=int64)

In [17]:
%run knn.py

In [18]:
knn_clf=KNNClassifier(k=6)

In [19]:
knn_clf.fit(data_reduction, y)

KNN(K=6)

In [20]:
# X_predict = np.array([23,45,123,234])

In [21]:
# y_predict = knn_clf.predict(729)
# print(y_predict.shape)

In [22]:
# y_predict

In [23]:
data_reduction.shape

(1000, 80)

In [24]:
y.shape

(1000,)

### train_test_split

In [25]:
shuffle_indexes = np.random.permutation(len(data_reduction))
# shuffle_indexes

In [26]:
test_ratio = 0.2
test_size = int(len(data_reduction) * test_ratio)
test_size

200

In [27]:
test_indexes = shuffle_indexes[:test_size]
train_indexes = shuffle_indexes[test_size:]

In [28]:
X_train = data_reduction[train_indexes]
y_train = y[train_indexes]

X_test = data_reduction[test_indexes]
y_test = y[test_indexes]

In [29]:
X_train.shape

(800, 80)

In [30]:
my_knn = KNNClassifier(k=6)

In [31]:
my_knn.fit(X_train, y_train)

KNN(K=6)

In [32]:
y_predict = my_knn.predict(X_test)

In [33]:
y_predict

array([4, 4, 5, 1, 5, 5, 7, 8, 7, 7, 5, 7, 7, 8, 7, 5, 1, 8, 7, 6, 7, 1, 3,
       7, 4, 8, 9, 7, 1, 6, 6, 8, 5, 1, 1, 9, 4, 5, 6, 6, 1, 7, 7, 7, 4, 4,
       6, 6, 5, 7, 7, 2, 6, 5, 6, 1, 4, 7, 7, 7, 7, 6, 1, 1, 4, 7, 7, 7, 7,
       7, 7, 1, 6, 8, 6, 4, 6, 7, 7, 7, 6, 6, 1, 6, 7, 7, 1, 7, 3, 6, 7, 1,
       7, 1, 7, 7, 8, 8, 9, 6, 2, 8, 7, 2, 4, 4, 1, 7, 1, 8, 7, 6, 3, 4, 7,
       7, 6, 7, 7, 8, 1, 8, 8, 7, 7, 1, 7, 7, 8, 6, 1, 1, 7, 1, 5, 4, 6, 7,
       7, 4, 1, 6, 6, 6, 1, 1, 1, 5, 4, 6, 8, 3, 5, 1, 8, 8, 4, 4, 7, 6, 5,
       7, 1, 5, 7, 4, 7, 8, 5, 8, 7, 6, 7, 7, 7, 7, 8, 1, 7, 8, 7, 4, 8, 6,
       7, 6, 1, 8, 7, 1, 5, 7, 2, 5, 6, 5, 7, 4, 4, 7], dtype=uint8)

In [34]:
sum(y_predict == y_test)/len(y_test)

0.435

In [35]:
for k in range(3,50):
    my_knn = KNNClassifier(k)
    my_knn.fit(X_train, y_train)
    y_predict = my_knn.predict(X_test)
    print(k, sum(y_predict == y_test)/len(y_test))  

3 0.46
4 0.465
5 0.45
6 0.435
7 0.425
8 0.455
9 0.445
10 0.45
11 0.455
12 0.455
13 0.44
14 0.435
15 0.43
16 0.43
17 0.43
18 0.425
19 0.44
20 0.45
21 0.445
22 0.44
23 0.43
24 0.435
25 0.43
26 0.43
27 0.425
28 0.43
29 0.43
30 0.43
31 0.43
32 0.425
33 0.435
34 0.43
35 0.44
36 0.44
37 0.44
38 0.43
39 0.43
40 0.43
41 0.43
42 0.425
43 0.425
44 0.42
45 0.42
46 0.41
47 0.405
48 0.41
49 0.405
