# Generate datasets with different data characteristics

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [103]:
def generate_var_data(B_matrices, T=1000,
                      is_linear=True, is_gaussian=True, is_stationary=True,
                      nonlinear_strength=0.3, error_scale=0.2, 
                      trend_probability=0.3, max_trend_strength=0.004,
                      fluctuation_scale=0.01, random_state=None):
    """
    Generate VAR data with various characteristics.
    
    Parameters:
    - B_matrices: List of coefficient matrices [B0, B1, B2, ..., Bp] where p is the maximum lag
    - T: Number of time points
    - is_linear: If False, introduces nonlinear relationships
    - is_gaussian: If False, uses non-Gaussian noise
    - is_stationary: If False, introduces trends and fluctuations
    - nonlinear_strength: Strength of nonlinear relationships
    - error_scale: Scale of the error terms
    - trend_probability: Probability of a variable having a trend
    - max_trend_strength: Strength of the trend (used if not stationary)
    - fluctuation_scale: Scale of the fluctuations (used if not stationary)
    - random_state: Seed for random number generation
    """
    np.random.seed(random_state)
    T_spurious = 20   # Number of spurious time points to discard
    
    n_vars = B_matrices[0].shape[0]
    n_lags = len(B_matrices) - 1
    
    # Ensure B0 is lower triangular
    B_matrices[0] = np.tril(B_matrices[0], k=-1)
    
    # Calculate M matrices
    M_matrices = []
    I_minus_B0_inv = np.linalg.inv(np.eye(n_vars) - B_matrices[0])
    for i in range(1, n_lags + 1):
        M_matrices.append(np.dot(I_minus_B0_inv, B_matrices[i]))

    # Generate error terms (Gaussian or t-distributed)
    if is_gaussian:
        ee = np.random.normal(size=(n_vars, T + T_spurious))
    else:
        ee = np.random.standard_t(df=5, size=(n_vars, T + T_spurious))
    
    ee = ee - np.mean(ee, axis=1, keepdims=True)
    ee = ee / np.std(ee, axis=1, keepdims=True)
    std_e = (np.random.uniform(size=(n_vars,)) + 0.5) * error_scale
    nn = np.dot(I_minus_B0_inv, np.diag(std_e) @ ee)

    # Generate time series data
    xx = np.zeros((n_vars, T + T_spurious))
    base_levels = np.random.uniform(1.0, 5.0, n_vars)
    xx[:, :n_lags] = base_levels[:, np.newaxis] + np.random.normal(0, 0.1, (n_vars, n_lags))

    for t in range(n_lags, T + T_spurious):
        xx[:, t] = sum(np.dot(M, xx[:, t-i]) for i, M in enumerate(M_matrices, start=1))
        if not is_linear:
            xx[:, t] += nonlinear_strength * np.tanh(xx[:, t-1])
        xx[:, t] += nn[:, t]    # Add noise

    # Add trends if not stationary
    if not is_stationary:
        trend_vars = np.random.random(n_vars) < trend_probability
        for var in range(n_vars):
            if trend_vars[var]:
                trend_strength = np.random.uniform(0, max_trend_strength)
                trend = np.arange(T + T_spurious) * trend_strength
                xx[var, :] += trend

    # Add fluctuations to all variables to mimic random walk behavior
    for var in range(n_vars):
        fluctuation = np.cumsum(np.random.normal(0, fluctuation_scale, T + T_spurious))
        xx[var, :] += fluctuation

    # Remove the first T_spurious time points
    data = xx[:, T_spurious:]
    
    # Round all elements in data to 4 decimal places
    data = np.round(data, decimals=4)

    return data, B_matrices

In [123]:
def save_ground_truth(B_matrices, filepath):
    with open(filepath, 'w') as f:
        for i, B in enumerate(B_matrices):
            np.savetxt(f, B, delimiter=',', fmt='%.3f')
            if i < len(B_matrices) - 1:
                f.write('\n')

def generate_and_save_dataset(config, data_type, dataset_index, output_dir):

    data, B_matrices = generate_var_data(**config)
    
    # Save the data
    df = pd.DataFrame(data.T, columns=[f"x{i}" for i in range(data.shape[0])])
    df.to_csv(f"{output_dir}/dataset_{dataset_index}.csv", index=False)
    
    # Save the ground truth (only for the first dataset of each type)
    if dataset_index == 0:
        save_ground_truth(B_matrices, f"{output_dir}/ground_truth.csv")
    
    # Optionally, plot the data
    plot_time_series(data, f"{data_type} Dataset {dataset_index}")
    plt.savefig(f"{output_dir}/plot_{dataset_index}.png")
    plt.close()

def generate_B_matrices(n_vars=5, n_lags=1, sparsity=0.7, max_coef=0.5, diag_strength=(0.3, 0.8)):
    B_matrices = []
    
    # B0: Instantaneous effect matrix (lower triangular)
    B0 = np.zeros((n_vars, n_vars))
    for i in range(1, n_vars):
        for j in range(i):
            if np.random.rand() > sparsity:
                B0[i, j] = np.round(np.random.uniform(-max_coef, max_coef), 3)
    B_matrices.append(B0)
    
    # B1 to Bn: Lagged effect matrices
    for lag in range(1, n_lags + 1):
        B = np.zeros((n_vars, n_vars))
        for i in range(n_vars):
            # Diagonal elements (autocorrelation)
            if lag == 1:
                B[i, i] = np.round(np.random.uniform(diag_strength[0], diag_strength[1]), 3)
            
            # Off-diagonal elements
            for j in range(n_vars):
                if i != j and np.random.rand() > sparsity:
                    B[i, j] = np.round(np.random.uniform(-max_coef, max_coef), 3)
        
        B_matrices.append(B)
    
    B_matrices = stabilize_var_matrices(B_matrices)
    return B_matrices

import numpy as np

def stabilize_var_matrices(B_matrices, threshold=0.99, max_iterations=100):
    """
    Stabilize VAR matrices by scaling them until all eigenvalues are below the threshold.

    Parameters:
    - B_matrices: List of coefficient matrices [B0, B1, B2, ..., Bp]
    - threshold: Maximum allowed absolute eigenvalue (default: 0.99)
    - max_iterations: Maximum number of scaling iterations (default: 100)

    Returns:
    - Stabilized B_matrices
    """
    n_vars = B_matrices[0].shape[0]
    n_lags = len(B_matrices) - 1

    for iteration in range(max_iterations):
        # Construct companion matrix
        companion = np.zeros((n_vars * n_lags, n_vars * n_lags))
        companion[:n_vars, :] = np.hstack(B_matrices[1:])
        companion[n_vars:, :-n_vars] = np.eye(n_vars * (n_lags - 1))

        # Calculate maximum absolute eigenvalue
        max_abs_eigenvalue = np.max(np.abs(np.linalg.eigvals(companion)))

        # Check if all eigenvalues are below the threshold
        if max_abs_eigenvalue < threshold:
            print(f"Stabilization complete after {iteration + 1} iterations.")
            return B_matrices

        # Scale all matrices except B0
        scaling_factor = 0.95 / max_abs_eigenvalue
        for i in range(1, len(B_matrices)):
            B_matrices[i] *= scaling_factor

    print(f"Warning: Maximum iterations reached. Final max eigenvalue: {max_abs_eigenvalue}")
    return B_matrices

def plot_time_series(data, title="Time Series Plot"):
    plt.figure(figsize=(12, 8))
    for i in range(data.shape[0]):
        plt.plot(data[i, :], label=f'x{i}')
    plt.title(title)
    plt.xlabel('Time')
    plt.ylabel('Value')
    plt.legend()
    plt.grid(True)

In [None]:
# # Rename plots
# data_type = 'scale_50_var'
# output_dir = f'../../data/synthetic/{data_type}'

# for i in range(10):
#     data = pd.read_csv(f'../../data/synthetic/scale_50_var/dataset_{i}.csv')
#     columns = data.columns.tolist()
#     if "Date" in columns:
#         data = data.drop(['Date'], axis=1).values
#         columns.remove('Date')
#     else:
#         data = data.values
#     plot_time_series(data.T, f"scale_20_var Dataset {i}")
#     plt.savefig(f"{output_dir}/plot_{i}.png")

### Generate multiple datasets for each type

In [129]:
B_matrices = [
    np.array([
        [ 0,    0,    0,    0,    0   ],
        [ 0.4,  0,    0,    0,    0   ],
        [-0.2,  0.3,  0,    0,    0   ],
        [ 0,    0.2, -0.1,  0,    0   ],
        [ 0.1,  0,    0.3,  0.2,  0   ]
    ]),
    np.array([
        [ 0.4,  0,   -0.2,  0,    0.1],
        [ 0,    0.3,  0,    0.15, 0  ],
        [-0.1,  0.2,  0.5,  0,    0  ],
        [ 0.2,  0,    0,    0.6, -0.1],
        [ 0,    0,   -0.15, 0.2,  0.4]
    ])
]
B_matrices = generate_B_matrices(n_vars=20, n_lags=1, sparsity=0.7)

config = {
    'B_matrices': B_matrices,
    'T': 1000,
    'is_linear': True,
    'is_gaussian': False,
    'is_stationary': True,
    'trend_probability': 0.3,
    'max_trend_strength': 0.005,
    'fluctuation_scale': 0.04
}

data_type = 'scale_50_var'
output_dir = f'../../data/synthetic/{data_type}'
for i in range(10):  # Generate 10 datasets for each type
    generate_and_save_dataset(config, data_type, i, output_dir)


Stabilization complete after 2 iterations.


### Generate multiple datasets for all type

In [8]:
# Define configurations for different types of data

B_matrices_dense = generate_B_matrices(n_vars=5, n_lags=1, sparsity=0.3)
B_matrices_sparse = generate_B_matrices(n_vars=5, n_lags=1, sparsity=0.7)
B_matrices_15 = generate_B_matrices(n_vars=15, n_lags=1, sparsity=0.7)

configs = {
    'scale_5_var': {
        'B_matrices': B_matrices_sparse,
        'T': 1000,
        'is_linear': True,
        'is_gaussian': False,
        'is_stationary': True,
        'trend_probability': 0.3,
        'max_trend_strength': 0.005,
        'fluctuation_scale': 0.04
    },
    'scale_15_var': {
        'B_matrices': B_matrices_15,
        'T': 1000,
        'is_linear': True,
        'is_gaussian': False,
        'is_stationary': True,
        'trend_probability': 0.3,
        'max_trend_strength': 0.005,
        'fluctuation_scale': 0.04
    },
    'linear': {
        'B_matrices': B_matrices_sparse,
        'T': 1000,
        'is_linear': True,
        'is_gaussian': False,
        'is_stationary': True,
        'trend_probability': 0.3,
        'max_trend_strength': 0.005,
        'fluctuation_scale': 0.04
    },
    'non_linear': {
        'B_matrices': B_matrices_sparse,
        'T': 1000,
        'is_linear': False,
        'is_gaussian': False,
        'is_stationary': True,
        'trend_probability': 0.3,
        'max_trend_strength': 0.005,
        'fluctuation_scale': 0.04
    },
    'gaussian': {
        'B_matrices': B_matrices_sparse,
        'T': 1000,
        'is_linear': True,
        'is_gaussian': True,
        'is_stationary': True,
        'trend_probability': 0.3,
        'max_trend_strength': 0.005,
        'fluctuation_scale': 0.04
    },
    'non_gaussian': {
        'B_matrices': B_matrices_sparse,
        'T': 1000,
        'is_linear': True,
        'is_gaussian': False,
        'is_stationary': True,
        'trend_probability': 0.3,
        'max_trend_strength': 0.005,
        'fluctuation_scale': 0.04
    },
    'stationary': {
        'B_matrices': B_matrices_sparse,
        'T': 1000,
        'is_linear': True,
        'is_gaussian': False,
        'is_stationary': True,
        'trend_probability': 0.3,
        'max_trend_strength': 0.005,
        'fluctuation_scale': 0.04
    },
    'non_stationary': {
        'B_matrices': B_matrices_sparse,
        'T': 1000,
        'is_linear': True,
        'is_gaussian': False,
        'is_stationary': False,
        'trend_probability': 0.3,
        'max_trend_strength': 0.005,
        'fluctuation_scale': 0.04
    },
    'sparse': {
        'B_matrices': B_matrices_sparse,
        'T': 1000,
        'is_linear': True,
        'is_gaussian': False,
        'is_stationary': False,
        'trend_probability': 0.3,
        'max_trend_strength': 0.005,
        'fluctuation_scale': 0.04,
    },
    'dense': {
        'B_matrices': B_matrices_dense,
        'T': 1000,
        'is_linear': True,
        'is_gaussian': False,
        'is_stationary': False,
        'trend_probability': 0.3,
        'max_trend_strength': 0.005,
        'fluctuation_scale': 0.04,
    }
}

# Generate datasets for each configuration
for config_name, config in configs.items():
    output_dir = f'../../data/synthetic/{config_name}'
    
    for i in range(10):
        generate_and_save_dataset(config, config_name, i, output_dir)

print("All datasets generated successfully.")

All datasets generated successfully.
