In [1]:
from itertools import product
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
from scipy.fftpack import dct, idct
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from skimage.measure import block_reduce

In [2]:
train_data = datasets.MNIST(
    root='data',
    train=True,
    transform=ToTensor(),
    download=True,
)
test_data = datasets.MNIST(
    root='data',
    train=False,
    transform=ToTensor(),
)
data_len = {'train': len(train_data), 'test': len(test_data)}
print(data_len)

{'train': 60000, 'test': 10000}


In [27]:
def apply_dynamics(sample, iters, f_act, vel, v_o, v_m, pool=0):
    working_sample = np.copy(sample)

    for i in range(iters):
        if pool > 0 and i > 0 and int(i % (iters / pool)) == 0:
            working_sample = block_reduce(working_sample, (2, 2), cval=0.5)

        v_s = vel(i, iters)
        # apply doppler effect to sample
        working_sample = ((v_m + v_o) / (v_m + v_s)) * working_sample  # doppler-effect
        working_sample = f_act(working_sample)  # activation function

    return working_sample

In [4]:
# vel_s is negative if moving towards observer
def v1(x, n):
    return -(x + 1) / (n / 3)
def v2(x, n):
    return x
def v3(x, n):
    return 1.5

def relu(x):
    return np.maximum(0, x)
relu = np.vectorize(relu)
# sigmoid = np.vectorize(lambda x: 1 / (1 + np.exp(-x)))
tanh = np.vectorize(np.tanh)

v_o = 0  # positive if moving towards source
v_m = 5.022  # small tail to avoid division errors.

In [30]:
iters = 15
pool = 1  # Pooling is applied to sample after iters/pool iterations. Use 0 if no pooling is to applied
f_act = relu  # relu, tanh
amount_data = 60000
pca_components = 15

# Symbol dynamics
arr_dyn = np.array([])
print("Applying symbol dynamics:\n|0%" + " " * 16 + "|50%" + " " * 16 + "|100%")
for i in range(amount_data):
    if i % int(amount_data/40) == 0:
        print("#", end='')
    freq_sample = dct(dct(train_data[i][0][0].numpy().T, norm='ortho').T, norm='ortho')  # decompose sample
    dyn_sample = apply_dynamics(freq_sample, iters, f_act, v2, v_o, v_m, pool)

    # if used pooling, convert freq back to image space
    if pool > 0:
        dyn_sample = idct(idct(dyn_sample.T, norm='ortho').T, norm='ortho')

    if i == 0:
        arr_dyn = np.array([dyn_sample.reshape((-1))])
    else:
        arr_dyn = np.concat((arr_dyn, [dyn_sample.reshape((-1))]))

print('')
print(arr_dyn.shape)
df_dyn = pd.DataFrame(arr_dyn, columns=[f'x{g}' for g in range(dyn_sample.size)])

# PCA
print("Standardising Results")
scaling = StandardScaler()
scaling.fit(df_dyn)
scaled_data = scaling.transform(df_dyn)

print(f'Performing PCA on {"images" if pool != 0 else "frequencies"}')
model = PCA(n_components=pca_components)
model.fit(scaled_data)
results = model.transform(scaled_data)
print(results.shape)

# Save data
df_results = pd.DataFrame(columns=['data', 'label', 'train'])
for i, r in enumerate(results):
    result_str = np.array2string(r, separator=',')
    df_results = pd.concat([df_results, pd.DataFrame([[result_str, train_data[i][1], i < amount_data * 0.85]], columns=df_results.columns)], ignore_index=True)

df_results.to_csv(f'freq_RedData/freq_pca{pca_components}_RedData_{iters}i_{f_act.__name__}{"_pool" + str(pool) if pool != 0 else ""}_{amount_data}.csv', index=False)
print("Finished!")

Applying symbol dynamics:
|0%                |50%                |100%
########################################
(60000, 784)
Standardising Results
Performing PCA on images
(60000, 15)
Finished!
