In [None]:
import numpy as np
import spectrally_constrained_LVMs as scLVMs

from matplotlib import pyplot as plt
import matplotlib
from axs_fixer import fix

MEDIUM_SIZE = 30
BIGGER_SIZE = 40

plt.rc("font", size=MEDIUM_SIZE)  # controls default text sizes
plt.rc("axes", titlesize=MEDIUM_SIZE)  # fontsize of the axes title
plt.rc("axes", labelsize=MEDIUM_SIZE)  # fontsize of the x and y labels
plt.rc("xtick", labelsize=MEDIUM_SIZE)  # fontsize of the tick labels
plt.rc("ytick", labelsize=MEDIUM_SIZE)  # fontsize of the tick labels
plt.rc("legend", fontsize=MEDIUM_SIZE)  # legend fontsize
plt.rc("figure", titlesize=BIGGER_SIZE)  # fontsize of the figure title
plt.rc("figure", figsize=(16, 10), dpi=600)
plt.rc("savefig", dpi=600, format="pdf")
plt.rc("grid", linestyle="--")

matplotlib.rcParams.update(
    {  # Use mathtext, not LaTeX
        "text.usetex": False,
        # Use the Computer modern font
        "font.family": "serif",
        "font.serif": "cmr10",
        "mathtext.fontset": "cm",
        # Use ASCII minus
        "axes.unicode_minus": False,
    }
)

plt.ioff()

scLVMs.__version__

# Example one

In the first example, the negentropy-based objective function common to ICA is used to estimate the model parameters.

## Perform all model estimation steps

In [None]:
np.random.seed(0)

## Step 1: Load in the time series signal
Fs = 20480

data_dict = np.loadtxt("./2004.02.16.04.32.39")
signal_data = data_dict[:, 0]

## Step 2: Perform pre-processing
Lw = 256
Lsft = 1
X = scLVMs.hankel_matrix(signal_data, Lw, Lsft) # Hankelise the vibration data

## Define the cost function
cost_inst = scLVMs.NegentropyCost("exp", {"alpha":1}) # negentropy objective

## Define the model
model_inst = scLVMs.LinearModel(n_sources = 5,
                                cost_instance = cost_inst, 
                                organise_by_kurt = True,
                                alpha_reg = 1.0, 
                                verbose = True)

## Step 3: Estimate the model parameters
model_inst.fit(X, n_iters = 500, learning_rate = 1, tol = 1e-4, Fs = Fs)

## Step 4: Obtain the latent representation Z
Z = model_inst.transform(X)

## Step 5: Obtain the recovered representation of X
X_recon = model_inst.inverse_transform(Z)

## Visualisation - Figure one

In [None]:
colors = ["#00b0f9", "#bfacdc", "#d0bdd5", "#ffa59e", "#dd4c65","#93003a"]
fig, ax = plt.subplots(figsize = (12, 10))

n = len(signal_data)
fft_freq = np.fft.fftfreq(n, 1/Fs)[:n//2]
fft_val = 2/n * np.abs(np.fft.fft(signal_data)[:n//2])
ax.plot(fft_freq, fft_val, lw = 0.5, color = "k", label = "Record 540")
ax.set_xlabel("Frequency (Hz)")
ax.set_ylabel("Magnitude")
ax.grid()
ax.legend()

fig.tight_layout()

# plt.savefig("./signal.pdf")
# plt.savefig("./signal.png")
plt.show()

## Visualisation - Figure two

In [None]:
fig, ax = plt.subplots(5, 2, figsize = (20, 24))

n = Z.shape[0]
fft_freq = np.fft.fftfreq(n, 1/Fs)[:n//2]

for cnt in range(5):
    fft_val = 2/n * np.abs(np.fft.fft(Z[:, cnt])[:n//2])
    se_fft_val = 2/n * np.abs(np.fft.fft(Z[:, cnt]**2)[:n//2])
    ax[cnt, 0].plot(fft_freq, fft_val, color = "b", linewidth = 0.5, label = f"Source {cnt + 1}: Spectrum")
    ax[cnt, 1].plot(fft_freq, se_fft_val, color = "r", linewidth = 0.5, label = f"Source {cnt + 1}: SES")
    ax[cnt, 1].set_xlim(-10, 2000)
    
fig.tight_layout()

for cnt, axs in enumerate(ax.flatten()):
    axs.set_xlabel("Frequency (Hz)")
    axs.set_ylabel("Magnitude")
    axs.grid()
    axs.legend()
    fix(axs, minor_flag=True, flag_3d=False)
    
# plt.savefig("./sources.pdf")
# plt.savefig("./sources.png")
plt.show()

# Example two

In this example, different methods of cost function implementation are demonstrated.

In [None]:
import spectrally_constrained_LVMs as scLVMs
import numpy as np
import sympy as sp
np.random.seed(0)

# Setup general X matrix
X_ = np.random.randn(1000, 16)
w_ = np.random.randn(16, 1)
y_ = X_ @ w_

## Cost function: implementation one

In [None]:
# Method 1 - user defined cost function

## Initialise the cost function instance
cost_inst = scLVMs.UserCost(use_hessian=True)

## Implement the objective function, gradient and Hessian
def loss(X, w, y):
    return -1 * np.mean((X @ w) ** 2, axis=0)

def grad(X, w, y):
    return -2 * np.mean(y * X, axis=0, keepdims=True).T

def hess(X, w, y):
    return -2 * np.cov(X, rowvar=False)

# Set the properties
cost_inst.set_cost(loss)
cost_inst.set_gradient(grad)
cost_inst.set_hessian(hess)

# Check that the gradient and Hessian make sense
res_grad = cost_inst.check_gradient(X_, w_, y_, 1e-4)
res_hess = cost_inst.check_hessian(X_, w_, y_, 1e-4)

## Cost function: implementation two

In [None]:
# Method 2 - Sympy
## Set the feature shapes (necessary for SymPy version)
n_samples = X_.shape[0]
n_features = w_.shape[0]

## Initialise the cost function instance
cost_inst = scLVMs.SympyCost(n_samples, n_features, use_hessian=True)

## Get the SymPy representations of the model parameters
X_sp, w_sp, iter_params = cost_inst.get_model_parameters()
i, j = iter_params

## Calculate the objective function
loss_i = sp.Sum(w_sp[j, 0] * X_sp[i, j], (j, 0, n_features - 1))
loss = -1 / n_samples * sp.Sum((loss_i) ** 2, (i, 0, n_samples - 1))

## Set the properties within the instance
cost_inst.set_cost(loss)

## Ask Sympy to derive the first and second order derivatives
cost_inst.implement_methods()

# Check that the gradient and Hessian make sense
res_grad = cost_inst.check_gradient(X_, w_, y_, 1e-4)
res_hess = cost_inst.check_hessian(X_, w_, y_, 1e-4)

## Cost function: implementation three

In [None]:
# Method 3 - negentropy loss

## Initialise the cost function instance
cost_inst = scLVMs.NegentropyCost(source_name="exp", 
                                  source_params={"alpha": 1})

## Check that the gradient and Hessian make sense
res_grad = cost_inst.check_gradient(X_, w_, y_, 1e-4)
res_hess = cost_inst.check_hessian(X_, w_, y_, 1e-4)

## Cost function: implementation four

In [None]:
# Method 4 - variance loss

## Initialise the cost function instance
cost_inst = scLVMs.VarianceCost(use_hessian=True, verbose=True)

## Check that the gradient and Hessian make sense
res_grad = cost_inst.check_gradient(X_, w_, y_, 1e-4)
res_hess = cost_inst.check_hessian(X_, w_, y_, 1e-4)