# setting 

In [3]:
import numpy as np
import tensorflow as tf

In [4]:
gpu_devices = tf.config.experimental.list_physical_devices('GPU')
if gpu_devices:
    print('Using gpu')
    tf.config.experimental.set_memory_growth(gpu_devices[0], True)
else:
    print('Using cpu')

Using gpu


# data

In [5]:
# Load and scale data
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
input_shape=x_train.shape[1:] # 28x28 here
x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.
x_train = x_train.reshape(-1,28*28)
x_test = x_test.reshape(-1,28*28)

# scikit-learn PCA

In [8]:
from sklearn.decomposition import PCA

In [6]:
sklearn_pca = PCA(n_components=10)
sklearn_pca.fit(x_train)
x_train_sklearn = sklearn_pca.transform(x_train)
x_train_sklearn.shape

(60000, 10)

In [7]:
x_train_sklearn[0]

array([ 0.4859638 , -1.2261759 , -0.09615013, -2.1794016 , -0.1071211 ,
       -0.9116693 ,  0.9175417 ,  0.62658876, -1.425468  ,  0.77740115],
      dtype=float32)

# dynamic PCA

In [9]:
class PCALayer(tf.keras.layers.Layer):
    n_components : int = None   # No dimension reduction with None
    
    def __init__(self, n_components : int, inverse_layer : bool, **options):
        self.inverse_layer = inverse_layer
        self._fitted = False
        self.n_components = n_components
        self.mean = None
        self.singular_values = None
        self.U = None
        self.V = None
        options['dynamic']   = True  # Is dynamic
        options['trainable'] = False # but not trainable
        super().__init__(**options)  # further Layer init
    
    def fit(self, x_train : tf.Tensor) -> None:
        """
        Calculates the SVD, afterwards the PCA can be applied
        """
        self.mean = tf.reduce_mean(x_train, axis=0)
        x_train = tf.subtract(x_train, self.mean)     # Centering of data
        # Sidenote, tf, numpy and scipys svds can be different
        # a is a tensor.
        # u is a tensor of left singular vectors.
        # v is a tensor of right singular vectors.
        singular_values, U, V = tf.linalg.svd(x_train, full_matrices=False)
        V = tf.transpose(V)
        # Store singular values and transformation matrix V
        self.singular_values, self.V = singular_values, V  # TODO: Should be done as tf weights
        self._fitted = True       
    
    def transform(self, X : tf.Tensor) -> tf.Tensor:
        """
        Applies the PCA transformation with choosen dimension
        which is set by the attribute n_components.
        Output shape: (X.shape[0], self.n_components)
        """
        X = tf.subtract(X, self.mean) # Center data
        # Transform with submatrix of V to choosen dimension
        Xk = tf.matmul(X, tf.transpose(self.V[:self.n_components, :]))
        return Xk
        
    def inverse_transform(self, Xk : tf.Tensor) -> tf.Tensor:
        """
        Reverse the PCA transformation from Xk with dimension k
        to original dimension n.
        Output shape: (Xk.shape[0], self.singular_values.shape[0])
        """
        X = tf.matmul(Xk, self.V[:Xk.shape[-1], :]) # Always returns to full dimension of V
        X = X + self.mean                           # Reverse the centering
        return X
        
    def call(self, input : tf.Tensor) -> tf.Tensor:
        """
        This function is called when data passes the layer
        """
        if not self._fitted :   # No action if not fitted
            return input
        if self.inverse_layer:  # Reverse transformation
            return self.inverse_transform(input)
        return self.transform(input) # reduce dimension with PCA
    
    def compute_output_shape(self, input_shape : tf.Tensor) -> tf.Tensor:
        """
        This method is not needed if TensorFlow runs eagerly
        """
        if not self._fitted:
            return input_shape
        if not self.inverse_layer:
            return tf.TensorShape([input_shape[0], self.n_components or input_shape[1:]])
        return tf.TensorShape([None, self.singular_values.shape[0]])

        

In [10]:
dynamic_pca = PCALayer(n_components=10, inverse_layer=False)
dynamic_pca.fit(x_train)
x_train_dynamic = dynamic_pca(x_train)
x_train_dynamic.shape

TensorShape([60000, 10])

In [11]:
x_train_dynamic[0].numpy()

array([-0.4859962 ,  1.2259454 , -0.09619759, -2.1795943 , -0.10713483,
        0.9117173 , -0.9177433 , -0.6267426 ,  1.4255993 ,  0.778152  ],
      dtype=float32)

# difference

In [12]:
i = 0
print(x_train_sklearn[i])
print(x_train_dynamic[i].numpy())
print(x_train_sklearn[i] - x_train_dynamic[i].numpy())

[ 0.4859638  -1.2261759  -0.09615013 -2.1794016  -0.1071211  -0.9116693
  0.9175417   0.62658876 -1.425468    0.77740115]
[-0.4859962   1.2259454  -0.09619759 -2.1795943  -0.10713483  0.9117173
 -0.9177433  -0.6267426   1.4255993   0.778152  ]
[ 9.7195995e-01 -2.4521213e+00  4.7460198e-05  1.9264221e-04
  1.3731420e-05 -1.8233867e+00  1.8352849e+00  1.2533314e+00
 -2.8510673e+00 -7.5083971e-04]
