In [1]:
import pydpf
import torch
from torch import Tensor
from typing import Tuple, Union
from pydpf.datautils import simulate_and_save

from pydpf.distributions.Gaussian import MultivariateGaussian, LinearGaussian
from pathlib import Path
import shutil
import os
import numpy as np

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [2]:
gen_num = 9
def new_gen():
    global gen_num
    global device
    gen_num += 1
    return torch.Generator(device=device).manual_seed(gen_num)

def get_spectral_radius(M):
    eigvals = torch.linalg.eigvals(M)
    return torch.max(torch.abs(eigvals))

def make_random(size, range, sparsity, device, generator):
    return torch.where(torch.rand(size, device=device, generator=generator) < sparsity, torch.rand(size, device=device, generator=generator) * (range[1] - range[0]) + range[0], 0) 

def make_random_matrix(*, data:Tensor=None, device:torch.device=None, force_diagonal:bool=False, size:Union[Tuple[int,int], int]=None, range:Tuple[float,float]=(0.,1.), diag_range:Tuple[float,float]=(0.,1.), off_diag_range:Tuple[float,float]=(0., 1.), max_radius:float=None, generator:torch.Generator=None, positive_definite:bool=False, requires_grad:bool = True, sparsity:float=1.):
    if generator is None:
        generator = torch.Generator(device=device)
    if not data is None:
        return torch.nn.Parameter(data, requires_grad)
    if isinstance(size, int):
        vec = make_random(size, range, sparsity, device, generator)
        return torch.nn.Parameter(vec, requires_grad)
    if size[0]==size[1]:
        if positive_definite and diag_range[0]<0:
            raise ValueError("Diagonal range must be positive for positive definite matrices")
        
        diag = make_random(size[0], diag_range, sparsity, device, generator)
        matrix = torch.diag(diag)
        if not force_diagonal:
            off_diag = make_random(size, off_diag_range, sparsity, device, generator) * (1-torch.eye(size[0], device=device))
            matrix += off_diag
            if positive_definite:
                matrix = matrix.T @ matrix
        if max_radius is not None:
            radius = get_spectral_radius(matrix)
            if radius > max_radius:
                raise ValueError(f'Spectral radius {radius} exceeds maximum {max_radius}, consider decreasing the range or trying a different seed')
        return torch.nn.Parameter(matrix, requires_grad)
            
    matrix = make_random(size, range, sparsity, device, generator)
    return torch.nn.Parameter(matrix, requires_grad)
        

In [21]:
series_name = 'test'

trajectory_length = 100
number_of_trajectories = 1000

state_dimension = 5
observation_dimension = 5

max_prior_variance = 1.
max_prior_covariance = 0.3

max_dynamic_matrix_diag = 0.7
max_dynamic_matrix_off_diag = 0.8
max_dynamic_bias = 0.5
dynamic_sparsity = 0.8

max_observation_matrix_diag = 2.
max_observation_matrix_off_diag = 1.
max_observation_bias = 1.
observation_sparsity = 0.6

max_dynamic_variance = 0.3
max_dynamic_covariance = 0.1

max_observation_variance = 0.4
max_observation_covariance = 0.2

generator = new_gen()

In [22]:
data_dir = Path('./test/')
if data_dir.is_dir():
    shutil.rmtree(data_dir)
os.mkdir(data_dir)

prior_covariance = make_random_matrix(size = (state_dimension, state_dimension), diag_range=(0, max_prior_variance), off_diag_range=(-max_prior_covariance, max_prior_covariance), generator=generator, device=device, requires_grad=False, positive_definite=True)
prior_covariance_np = prior_covariance.cpu().numpy()
np.savetxt(data_dir / 'prior_covariance.csv', prior_covariance_np, delimiter=',')

dynamic_matrix = make_random_matrix(size=(state_dimension,state_dimension), diag_range=(-max_dynamic_matrix_diag, max_dynamic_matrix_diag), off_diag_range=(-max_dynamic_matrix_off_diag, max_dynamic_matrix_off_diag), generator=generator, max_radius=1, device=device, sparsity=dynamic_sparsity, requires_grad=False)
dynamic_matrix_np = dynamic_matrix.cpu().numpy()
np.savetxt(data_dir / 'dynamic_matrix.csv', dynamic_matrix_np, delimiter=',')

dynamic_bias = make_random_matrix(size=state_dimension, range=(-max_dynamic_bias, max_dynamic_bias), generator=generator, device=device, requires_grad=False)
dynamic_bias_np = dynamic_bias.cpu().numpy()
np.savetxt(data_dir / 'dynamic_bias.csv', dynamic_bias_np, delimiter=',')

dynamic_covariance = make_random_matrix(size = (state_dimension, state_dimension), diag_range=(0, max_dynamic_variance), off_diag_range=(-max_dynamic_covariance, max_dynamic_covariance), generator=generator, device=device, requires_grad=False, positive_definite=True)
dynamic_covariance_np = dynamic_covariance.cpu().numpy()
np.savetxt(data_dir / 'dynamic_covariance.csv', dynamic_covariance_np, delimiter=',')

if state_dimension==observation_dimension:
    observation_matrix = make_random_matrix(size=(observation_dimension,state_dimension), diag_range=(-max_observation_matrix_diag, max_observation_matrix_diag), off_diag_range=(-max_observation_matrix_off_diag, max_observation_matrix_off_diag), generator=generator, device=device, sparsity=observation_sparsity, requires_grad=False)
else:
    observation_matrix = make_random_matrix(size=(observation_dimension,state_dimension), range=(-max_observation_matrix_off_diag, max_observation_matrix_off_diag), generator=generator, device=device, sparsity=observation_sparsity, requires_grad=False)
observation_matrix_np = observation_matrix.cpu().numpy()
np.savetxt(data_dir / 'observation_matrix.csv', observation_matrix_np, delimiter=',')

observation_bias = make_random_matrix(size=state_dimension, range=(-max_observation_bias, max_observation_bias), generator=generator, device=device, requires_grad=False)
observation_bias_np = observation_bias.cpu().numpy()
np.savetxt(data_dir / 'observation_bias.csv', observation_bias_np, delimiter=',')

observation_covariance = make_random_matrix(size = (observation_dimension, observation_dimension), diag_range=(0, max_dynamic_variance), off_diag_range=(-max_observation_covariance, max_observation_covariance), generator=generator, device=device, requires_grad=False, positive_definite=True)
observation_covariance_np = observation_covariance.cpu().numpy()
np.savetxt(data_dir / 'observation_covariance.csv', observation_covariance_np, delimiter=',')

In [23]:
prior_dist = MultivariateGaussian(mean = torch.zeros(state_dimension, device = device), cholesky_covariance=torch.linalg.cholesky(prior_covariance), generator=new_gen())
dynamic_dist = LinearGaussian(weight = dynamic_matrix, bias = dynamic_bias, cholesky_covariance=torch.linalg.cholesky(dynamic_covariance), generator=new_gen(), constrain_spectral_radius=0.99)
observation_dist = LinearGaussian(weight = observation_matrix, bias = observation_bias, cholesky_covariance=torch.linalg.cholesky(observation_covariance), generator=new_gen())
SSM = pydpf.FilteringModel(dynamic_model=dynamic_dist, observation_model=observation_dist, prior_model=prior_dist)

In [24]:
simulate_and_save(data_dir / 'data.csv', SSM=SSM, time_extent=trajectory_length, n_trajectories=number_of_trajectories, batch_size=100, device=device, n_processes=-1)

Done                  

