In [248]:
import numpy as np


class PhasorAverageModel:
	horizon = 300
	Likelihood = [0] * horizon
	Coherence = [0] * horizon
	waves = np.empty((0, 3), dtype=float)
	likelihood = 0
	angular_freq = np.nan
	buffer = [None]
	phase_angle = 0
	amplitudeFunc = None
	envelopeFunc = None
	omegaFunc = None
	model_statistics = None

	@staticmethod
	def processWaves(waves):
		return waves[2:]

	@staticmethod
	def phasorValues(waves):
		omega = waves[:, 0]
		T = waves[:, 1]	# phase angle defined from common start referene t=0
		amplitude = np.ones_like(T) if waves.shape[-1] == 2 else waves[:, 2]
		t = T[-1]

		phi = t - T
		return omega, phi, amplitude

	@staticmethod
	def calculateHorizon(horizon, phi):
		if phi.shape[0] > 2:
			return horizon
		if phi.shape[0] > 6:
			d = np.diff(phi)
			statistic = max(d)
		else:
			statistic = np.mean(phi)

		if 3 * statistic > horizon:
			return int(2 * statistic)	# just to be safe
		return horizon

	@staticmethod
	def getPhasors(amplitude, envelope, omega):
		return amplitude * np.exp(-envelope) * np.exp(1j * omega)

	@staticmethod
	def phasorStatistics(phasor):

		s_sum = np.sum(phasor, axis=1)
		m_sum = np.abs(phasor).sum(axis=1)
		psi = s_sum / m_sum
		coherence = np.abs(psi)
		return psi, coherence

	@staticmethod
	def getLikelihood(psi, coherence, mode="normalised", func=None, **kwargs):
		if mode == "normalised":
			return (coherence + psi.real) / 2
		if mode == "max":
			return np.maximum(0, psi.real)
		if mode == "idk":
			return np.abs(psi)
		if func is not None:
			return func(psi, coherence, **kwargss)
		raise ValueError(f"{mode} is not defined")

	@staticmethod
	def predictPhasorAverage(
		waves,
		*,
		horizon=2000,
		amplitudeFunc=None,
		envelopeFunc=None,
		omegaFunc=None,
		**kwargs,
	):
		"""
		waves=[(angular_freq,phase_angle_k,amplitude),...]
		"""
		waves = PhasorAverageModel.processWaves(waves)

		if len(waves) == 0:
			return np.zeros(horizon), np.zeros(horizon)

		omega, phi, amplitude = PhasorAverageModel.phasorValues(waves)
		horizon = PhasorAverageModel.calculateHorizon(horizon, phi)

		Delta_t = np.arange(0, horizon + 1)[:, None]

		def amplitudeFunc(amplitude, **kwargs):
			return amplitude	# np.ones_like(amplitude)

		def envelopeFunc(phi, **kwargs):
			return np.zeros_like(phi)

		def omegaFunc(omega, phi, **kwargs):
			return phi * omega

		Envelope_t = envelopeFunc(phi)
		Omega_t = omegaFunc(omega, phi)
		Amplitude_t = amplitudeFunc(amplitude)

		Envelope_rot = envelopeFunc(Delta_t, **kwargs)
		Omega_rot = omegaFunc(omega, Delta_t, **kwargs)
		Amplitude_rot = np.ones_like(Delta_t, **kwargs)

		x_now = PhasorAverageModel.getPhasors(Amplitude_t, Envelope_t, Omega_t)
		x_rot = PhasorAverageModel.getPhasors(Amplitude_rot, Envelope_rot, Omega_rot)
		x_project = x_rot * x_now

		psi, coherence = PhasorAverageModel.phasorStatistics(x_project)
		likelihood = PhasorAverageModel.getLikelihood(**locals())

		return [likelihood, coherence]

	@staticmethod
	def construct_wave(angular_freq, phase_angle, amplitude):
		return np.array((angular_freq, phase_angle, amplitude))

	@staticmethod
	def valid_angular_frequency(waves):
		return len(waves) > 1

	@staticmethod
	def update_angular_frequency(x, phase_angle):

		return 2 * np.pi / (x - phase_angle)

	@staticmethod
	def update_angular_phase_angle(x, phase_angle):

		return x - phase_angle

	@staticmethod
	def get_current_likelihood(x, Likelihood, phase_angle):
		return Likelihood[x - phase_angle]

	@staticmethod
	def get_current_coherence(x, Coherence, phase_angle):

		return Coherence[x - phase_angle]

	@staticmethod
	def construct_waves(wave, waves):
		return np.concat([waves, [wave]])

	def __init__(self):
		pass

	def __call__(self, x):
		if self.valid_angular_frequency(self.waves):
			self.angular_freq = self.update_angular_frequency(x, self.phase_angle)

		time_delta = int(x - self.phase_angle)
		current_likelihood = self.Likelihood[time_delta]
		current_coherence = self.Coherence[time_delta]

		return current_likelihood, current_coherence

	def event(self, x, trigger):

		self.phase_angle = x
		amplitude = trigger

		wave = self.construct_wave(self.angular_freq, self.phase_angle, amplitude)
		self.waves = self.construct_waves(wave, self.waves)

		self.model_statistics = self.predictPhasorAverage(
			waves=self.waves,
			horizon=self.horizon,
			amplitudeFunc=self.amplitudeFunc,
			envelopeFunc=self.envelopeFunc,
			omegaFunc=self.omegaFunc,
		)

		self.Likelihood = self.model_statistics[0]
		self.Coherence = self.model_statistics[1]

		return self.Likelihood, self.Coherence

In [253]:
def getRegimeIndices(n_events, periods, start_at_origin=False, **kwargs):
	sub_signals = np.vectorize(np.arange, otypes=[object])(n_events) * periods
	last_index = np.array([0, *[item[-1] for item in sub_signals[:-1]]])
	period_off = np.cumsum(periods) - periods[0] * int(start_at_origin)
	offset = period_off + np.cumsum(last_index)
	return np.concat((sub_signals + np.array(offset)))


def getRealSignal(regimes, start_at_origin=False, **kwargs):
	n_events, periods = np.array(regimes)[:, :2].T
	indices = getRegimeIndices(n_events, periods, **kwargs)
	true_signal = np.zeros((n_events @ periods) + 1)
	true_signal[indices] = 1
	return true_signal


SIGNAL_REGIMES = [
	(25, 30),	# 25 events with period 30
	(20, 70),	# 20 events with period 70
	(30, 45),	# 30 events with period 45
]
true_signal = getRealSignal(SIGNAL_REGIMES, start_at_origin=True)

sig_len = len(true_signal)

timestamps = np.arange(sig_len)

real_pulse_times = timestamps[true_signal == 1]


def generate_truncated_cauchy(n_samples, location=0.0, scale=0.001, low=-1.0, high=1.0):

	def cdf(x):
		return 0.5 + (1 / np.pi) * np.arctan((x - location) / scale)

	def inverse_cdf(p):
		return location + scale * np.tan(np.pi * (p - 0.5))

	cdf_low = cdf(low)
	cdf_high = cdf(high)

	uniform_samples = np.random.uniform(0.0, 1.0, n_samples)

	prob_samples = cdf_low + uniform_samples * (cdf_high - cdf_low)

	final_samples = inverse_cdf(prob_samples)

	return final_samples


noise = generate_truncated_cauchy(sig_len)


m = PhasorAverageModel()
for x, y in enumerate(true_signal):
	m(x)
	if y == 1:
		m.event(x, 1 + (noise[x]))

# h = len(m.Likelihood)
# print(m.Likelihood)


sig_len = len(true_signal)
h = np.arange(len(m.Likelihood)) + sig_len - 1	# - SIGNAL_REGIMES[0][-1]
fig = go.Figure()
for t in real_pulse_times:
	fig.add_shape(
		type="line",
		x0=t,
		y0=0,
		x1=t,
		y1=1 + (noise[t]),
		line=dict(color="rgba(0, 0, 0, 0.5)", width=1.5),
	)

fig.add_trace(
	go.Scatter(
		x=h,
		y=(m.Likelihood),
		mode="lines",
		# mode="markers",
		name="Likelihoods",
		# line=dict(color="rgb(255, 0, 0, 0.5)", width=4),
	)
)
fig.add_trace(
	go.Scatter(
		x=h,
		y=m.Coherence,
		mode="lines",
		# mode="markers",
		name="Coherence",
		# line=dict(color="rgb(255, 0, 0, 0.5)", width=4),
	)
)
fig.show()

In [None]:
from scipy.stats import gennorm


def generate_truncated_gennorm(
	n_samples, beta=1, alpha=0.7, location=0.0, low=-1.0, high=1.0
):

	cdf_low = gennorm.cdf(low, beta, loc=location, scale=alpha)
	cdf_high = gennorm.cdf(high, beta, loc=location, scale=alpha)

	uniform_samples = np.random.uniform(0.0, 1.0, n_samples)

	prob_samples = cdf_low + uniform_samples * (cdf_high - cdf_low)

	final_samples = gennorm.ppf(prob_samples, beta, loc=location, scale=alpha)

	return final_samples

ValueError: operands could not be broadcast together with shapes (3500,) (3501,) 

In [None]:
import numpy as np
import matplotlib.pyplot as plt


def generate_truncated_cauchy(n_samples, location=0.0, scale=0.05, low=-1.0, high=1.0):

	def cdf(x):
		return 0.5 + (1 / np.pi) * np.arctan((x - location) / scale)

	def inverse_cdf(p):
		return location + scale * np.tan(np.pi * (p - 0.5))

	cdf_low = cdf(low)
	cdf_high = cdf(high)

	uniform_samples = np.random.uniform(0.0, 1.0, n_samples)

	prob_samples = cdf_low + uniform_samples * (cdf_high - cdf_low)

	final_samples = inverse_cdf(prob_samples)

	return final_samples

In [None]:
def getRegimeIndices(n_events, periods, start_at_origin=False, **kwargs):
	# array_r = np.array(regimes)
	# n_events, periods = array_r[:, :2].T
	# total_time = n_events @ periods
	sub_signals = np.vectorize(np.arange, otypes=[object])(n_events) * periods
	last_index = np.array([0, *[item[-1] for item in sub_signals[:-1]]])
	period_off = np.cumsum(periods) - periods[0] * int(start_at_origin)
	offset = period_off + np.cumsum(last_index)
	return np.concat((sub_signals + np.array(offset)))


def getRealSignal(regimes, **kwargs):
	array_r = np.array(regimes)
	n_events, periods = array_r[:, :2].T
	total_time = n_events @ periods
	true_signal = np.zeros(total_time)
	indices = getRegimeIndices(n_events, periods, **kwargs)
	true_signal[indices] = 1
	print(true_signal)
	pass


SIGNAL_REGIMES = [
	(25, 30),	# 25 events with period 30
	(20, 70),	# 20 events with period 70
	(30, 45),	# 30 events with period 45
]

getRealSignal(SIGNAL_REGIMES, start_at_origin=True)

# np.arange(25) * 3

[1. 0. 0. ... 0. 0. 0.]


In [None]:
a = np.zeros(10, dtype=int)
idx = np.array([2, 4, 7])

a[idx] = 1
a

array([0, 0, 1, 0, 1, 0, 0, 1, 0, 0])