In [8]:
import numpy as np
import pandas as pd

def generate_time_series_data(num_timesteps, num_samples):
    alpha = 0.8  
    beta = {
        'Z': 2.5,
        'X1': 0.3, 'X2': 0.4, 'X3': 0.2, 'X4': 0.3,
        'Ti': 0.6, 'Td': 0.7, 'Y': 0.8
    }

    noise = {
        'Z': lambda: np.random.normal(0, 0.1, num_samples),
        'X1': lambda: np.random.normal(0, 0.05, num_samples),
        'X2': lambda: np.random.normal(0, 0.05, num_samples),
        'X3': lambda: np.random.normal(0, 0.05, num_samples),
        'X4': lambda: np.random.normal(0, 0.05, num_samples),
        'Ti': lambda: np.random.normal(0, 0.1, num_samples),
        'Td': lambda: np.random.normal(0, 0.1, num_samples),
        'Y': lambda: np.random.normal(0, 0.2, num_samples)
    }

    std_dev = {
        'Z': 0.1,
        'X1': 0.05, 'X2': 0.05, 'X3': 0.05, 'X4': 0.05,
        'Ti': 0.1, 'Td': 0.1, 'Y': 0.2
    }

    data = {var: np.zeros((num_timesteps, num_samples)) for var in ['Z', 'X1', 'X2', 'X3', 'X4', 'Ti', 'Td', 'Y']}
    
    # Ground truth treatment effect 저장
    treatment_effect = np.zeros((num_timesteps, num_samples))

    data['Z'][0] = np.random.normal(0, 1, num_samples)
    data['X1'][0] = 0.5 * data['Z'][0] + noise['X1']()
    data['X2'][0] = 0.4 * data['Z'][0] + noise['X2']() 
    data['X3'][0] = 0.3 * data['Z'][0] + noise['X3']()
    data['X4'][0] = 0.6 * data['Z'][0] + noise['X4']() + data['X2'][0]
    data['Ti'][0] = 0.2 * data['Z'][0] + noise['Ti']()
    
    td_continuous = 0.3 * data['Ti'][0] + 0.4 * data['Z'][0] + noise['Td']()
    data['Td'][0] = (td_continuous > np.median(td_continuous)).astype(int)
    
    treatment_effect[0] = beta['Y'] * data['Td'][0]
    data['Y'][0] = treatment_effect[0] + 3.0 * data['Z'][0] + noise['Y']()

    def sample_from_previous(prev_value, std, size, t):
        return np.random.normal(prev_value, std, size)

    for t in range(1, num_timesteps):
        z_sample = sample_from_previous(data['Z'][t-1], std_dev['Z'], num_samples, t)
        data['Z'][t] = alpha * z_sample + (1 - alpha) * noise['Z']()

        for i in range(1, 5):
            x_key = f'X{i}'
            x_sample = sample_from_previous(data[x_key][t-1], std_dev[x_key], num_samples, t)
            data[x_key][t] = alpha * x_sample + beta[x_key] * data['Z'][t] + noise[x_key]()

        ti_sample = sample_from_previous(data['Ti'][t-1], std_dev['Ti'], num_samples, t)
        data['Ti'][t] = alpha * ti_sample + beta['Ti'] * data['Z'][t] + 0.1 * np.mean(data['X1'][t] + data['X2'][t] + data['X3'][t] + data['X4'][t]) + noise['Ti']()

        td_continuous = alpha * data['Td'][t-1] + beta['Td'] * data['Ti'][t] + 0.2 * data['Z'][t] + noise['Td']()
        data['Td'][t] = (td_continuous > np.median(td_continuous)).astype(int)

        y_sample = sample_from_previous(data['Y'][t-1], std_dev['Y'], num_samples, t)
        treatment_effect[t] = beta['Y'] * data['Td'][t]**(2)
        data['Y'][t] = alpha * y_sample + treatment_effect[t] + 0.1 * data['Z'][t] + noise['Y']()

    return data, treatment_effect

num_timesteps = 20
num_samples = 100
generated_data, treatment_effect = generate_time_series_data(num_timesteps, num_samples)

df = pd.DataFrame({var: generated_data[var].flatten() for var in generated_data.keys()})
df['time'] = np.repeat(np.arange(num_timesteps), num_samples)
df['sample'] = np.tile(np.arange(num_samples), num_timesteps)
df['treatment_effect'] = treatment_effect.flatten()

df.to_csv('./syn_ts_data_with_treatment_effect.csv', index=False)

In [11]:
import numpy as np
import pandas as pd

def generate_time_series_data(num_timesteps, num_samples):
    def nonlinear_activation(x):
        return np.tanh(x)  

    def sigmoid(x):
        return 1 / (1 + np.exp(-x))

    alpha = 0.8  
    beta = {
        'Z': 2.5,
        'X1': 0.3, 'X2': 0.4, 'X3': 0.2, 'X4': 0.3,
        'Ti': 0.6, 'Td': 0.7, 'Y': 0.8
    }

    noise = {
        'Z': lambda: np.random.normal(0, 0.1, num_samples),
        'X1': lambda: np.random.normal(0, 0.05, num_samples),
        'X2': lambda: np.random.normal(0, 0.05, num_samples),
        'X3': lambda: np.random.normal(0, 0.05, num_samples),
        'X4': lambda: np.random.normal(0, 0.05, num_samples),
        'Ti': lambda: np.random.normal(0, 0.1, num_samples),
        'Td': lambda: np.random.normal(0, 0.1, num_samples),
        'Y': lambda: np.random.normal(0, 0.2, num_samples)
    }

    data = {var: np.zeros((num_timesteps, num_samples)) for var in ['Z', 'X1', 'X2', 'X3', 'X4', 'Ti', 'Td', 'Y']}
    
    # Ground truth treatment effect 
    treatment_effect = np.zeros((num_timesteps, num_samples))

    data['Z'][0] = np.random.normal(0, 1, num_samples)
    data['X1'][0] = nonlinear_activation(3.5 * data['Z'][0] + noise['X1']())
    data['X2'][0] = nonlinear_activation(3.4 * data['Z'][0] + noise['X2']())
    data['X3'][0] = nonlinear_activation(3.3 * data['Z'][0] + noise['X3']())
    data['X4'][0] = nonlinear_activation(3.6 * data['Z'][0] + noise['X4']())
    data['Ti'][0] = sigmoid(3.2 * data['Z'][0] + noise['Ti']())
    
    td_continuous = sigmoid(3.3 * data['Ti'][0] + 3.4 * data['Z'][0] + noise['Td']())
    data['Td'][0] = (td_continuous > np.median(td_continuous)).astype(int)
    
    treatment_effect[0] = beta['Y'] * data['Td'][0] * (1 + 0.2 * np.sin(data['Z'][0]))  # 비선형 처치 효과
    data['Y'][0] = treatment_effect[0] + nonlinear_activation(3.3 * data['Z'][0]) + noise['Y']()

    def sample_from_previous(prev_value, std, size, t):
        time_factor = np.sin(t * np.pi / 10) * 0.2 + np.cos(t * np.pi / 20) * 0.1  # 복합 주기 추가
        drift = np.random.normal(0, 0.01, size) * np.log(t + 1)  # 시간에 따라 증가하는 드리프트
        
        max_change = 0.5 * np.log(t + 1)  # 시간에 따라 증가하는 최대 변화량
        change = np.random.normal(0, std, size) + time_factor + drift
        change = np.clip(change, -max_change, max_change)
        
        return prev_value + change

    for t in range(1, num_timesteps):
        z_sample = sample_from_previous(data['Z'][t-1], 0.1, num_samples, t)
        data['Z'][t] = alpha * z_sample + (1 - alpha) * noise['Z']()

        for i in range(1, 5):
            x_key = f'X{i}'
            x_sample = sample_from_previous(data[x_key][t-1], 0.05, num_samples, t)
            interaction = 0.1 * data['Z'][t] * data[f'X{(i%4)+1}'][t-1]  # 다른 X 변수와의 상호작용
            data[x_key][t] = nonlinear_activation(alpha * x_sample + beta[x_key] * data['Z'][t] + interaction + noise[x_key]())

        ti_sample = sample_from_previous(data['Ti'][t-1], 0.1, num_samples, t)
        x_mean = np.mean([data[f'X{i}'][t] for i in range(1, 5)], axis=0)
        data['Ti'][t] = sigmoid(alpha * ti_sample + beta['Ti'] * data['Z'][t] + 0.2 * np.sin(x_mean) + noise['Ti']())

        td_continuous = sigmoid(alpha * data['Td'][t-1] + beta['Td'] * data['Ti'][t] + 0.2 * np.cos(data['Z'][t]) + noise['Td']())
        data['Td'][t] = (td_continuous > np.median(td_continuous)).astype(int)

        y_sample = sample_from_previous(data['Y'][t-1], 0.2, num_samples, t)
        treatment_effect[t] = beta['Y'] * data['Td'][t] * (1 + 0.2 * np.sin(data['Z'][t]) + 0.1 * np.cos(data['Ti'][t]))
        data['Y'][t] = nonlinear_activation(alpha * y_sample + treatment_effect[t] + 0.1 * np.sin(data['Z'][t]) + 0.05 * np.cos(x_mean) + noise['Y']())

    return data, treatment_effect

num_timesteps = 20
num_samples = 100
generated_data, treatment_effect = generate_time_series_data(num_timesteps, num_samples)

df = pd.DataFrame({var: generated_data[var].flatten() for var in generated_data.keys()})
df['time'] = np.repeat(np.arange(num_timesteps), num_samples)
df['sample'] = np.tile(np.arange(num_samples), num_timesteps)
df['treatment_effect'] = treatment_effect.flatten()

df.to_csv('./syn_ts_data_with_treatment_effect.csv', index=False)

In [5]:
import numpy as np
import pandas as pd

def generate_time_series_data(num_timesteps, num_samples):
    def nonlinear_activation(x):
        return np.tanh(x) + 0.1 * np.sin(5*x)  

    def complex_sigmoid(x):
        return 1 / (1 + np.exp(-x)) + 0.05 * np.sin(10*x)  

    alpha = 0.9  
    beta = {
        'Z': 2.5,
        'X1': 0.3, 'X2': 0.4, 'X3': 0.2, 'X4': 0.3,
        'Ti': 0.6, 'Td': 0.7, 'Y': 0.8
    }

    def time_varying_noise(std, t):
        return lambda: np.random.normal(0, std * (1 + 0.1 * np.sin(t * np.pi / 10)), num_samples)

    data = {var: np.zeros((num_timesteps, num_samples)) for var in ['Z', 'X1', 'X2', 'X3', 'X4', 'Ti', 'Td', 'Y']}
    
    data['U'] = np.zeros((num_timesteps, num_samples))
    
    treatment_effect = np.zeros((num_timesteps, num_samples))

    data['U'][0] = np.random.normal(0, 1, num_samples)
    data['Z'][0] = nonlinear_activation(2 * data['U'][0] + np.random.normal(0, 0.5, num_samples))
    data['X1'][0] = nonlinear_activation(0.5 * data['Z'][0] + 0.5 * data['U'][0] + time_varying_noise(0.05, 0)())
    data['X2'][0] = nonlinear_activation(3.4 * data['Z'][0] - 0.3 * data['U'][0] + time_varying_noise(0.05, 0)())
    data['X3'][0] = nonlinear_activation(0.3 * data['Z'][0] + 0.4 * data['U'][0] + time_varying_noise(0.05, 0)())
    data['X4'][0] = nonlinear_activation(3.6 * data['Z'][0] - 0.2 * data['U'][0] + time_varying_noise(0.05, 0)())
    data['Ti'][0] = complex_sigmoid(3.2 * data['Z'][0] + 0.5 * data['U'][0] + time_varying_noise(0.1, 0)())
    
    td_continuous = complex_sigmoid(3.3 * data['Ti'][0] + 3.4 * data['Z'][0] + 0.6 * data['U'][0] + time_varying_noise(0.1, 0)())
    data['Td'][0] = (td_continuous > complex_sigmoid(data['Z'][0])).astype(int)
    
    base_effect = beta['Y'] * data['Td'][0] * (1 + 0.2 * np.sin(data['Z'][0]) + 0.3 * np.cos(data['U'][0]))
    time_effect = 0.1 * np.sin(2 * np.pi * 0 / num_timesteps)  
    treatment_effect[0] = base_effect + time_effect
    data['Y'][0] = treatment_effect[0] + nonlinear_activation(3.3 * data['Z'][0] + 0.7 * data['U'][0]) + time_varying_noise(0.2, 0)()

    def complex_sample_from_previous(prev_value, std, size, t):
        time_factor = np.sin(t * np.pi / 10) * 0.2 + np.cos(t * np.pi / 20) * 0.1 + np.sin(t * np.pi / 5) * 0.05
        long_term_trend = 0.01 * np.log(t + 1)  
        drift = np.random.normal(0, 0.01, size) * np.log(t + 1) * np.sin(t * np.pi / 15)
        
        max_change = 0.5 * np.log(t + 1) * (1 + 0.1 * np.sin(t * np.pi / 10))
        change = np.random.normal(0, std, size) + time_factor + drift + long_term_trend
        change = np.clip(change, -max_change, max_change)
        
        return prev_value + change

    for t in range(1, num_timesteps):
        data['U'][t] = complex_sample_from_previous(data['U'][t-1], 0.1, num_samples, t)
        
        z_sample = complex_sample_from_previous(data['Z'][t-1], 0.1, num_samples, t)
        data['Z'][t] = nonlinear_activation(alpha * z_sample + (1 - alpha) * data['U'][t] + time_varying_noise(0.1, t)())

        for i in range(1, 5):
            x_key = f'X{i}'
            x_sample = complex_sample_from_previous(data[x_key][t-1], 0.05, num_samples, t)
            interaction = 0.1 * np.sin(data['Z'][t] * data[f'X{(i%4)+1}'][t-1])
            data[x_key][t] = nonlinear_activation(
                alpha * x_sample + 
                beta[x_key] * data['Z'][t] + 
                5.2 * data['U'][t] + 
                interaction + 
                0.05 * np.sin(t * np.pi / 10) +  
                time_varying_noise(0.05, t)()
            )

        ti_sample = complex_sample_from_previous(data['Ti'][t-1], 0.1, num_samples, t)
        x_mean = np.mean([data[f'X{i}'][t] for i in range(1, 5)], axis=0)
        data['Ti'][t] = complex_sigmoid(
            alpha * ti_sample + 
            beta['Ti'] * data['Z'][t] + 
            0.3 * data['U'][t] + 
            5.2 * np.sin(x_mean) + 
            0.1 * np.cos(t * np.pi / 15) +  
            time_varying_noise(0.1, t)()
        )

        td_continuous = complex_sigmoid(
            alpha * data['Td'][t-1] + 
            beta['Td'] * data['Ti'][t] + 
            0.3 * data['U'][t] + 
            0.2 * np.cos(data['Z'][t]) + 
            0.1 * np.sin(t * np.pi / 20) +  # 시간에 따른 주기적 효과
            time_varying_noise(0.1, t)()
        )
        data['Td'][t] = (td_continuous > complex_sigmoid(data['Z'][t] + 0.1 * data['U'][t])).astype(int)

        y_sample = complex_sample_from_previous(data['Y'][t-1], 0.2, num_samples, t)
        base_effect = beta['Y'] * data['Td'][t]**2 * (
            1 + 5.2 * np.sin(data['Z'][t]) + 
            0.3 * np.cos(data['U'][t]) + 
            0.1 * np.sin(data['Ti'][t])
        )
        time_effect = 0.1 * np.sin(2 * np.pi * t / num_timesteps) + 0.05 * np.cos(4 * np.pi * t / num_timesteps)
        treatment_effect[t] = base_effect + time_effect
        data['Y'][t] = nonlinear_activation(
            alpha * y_sample + 
            treatment_effect[t] + 
            0.1 * np.sin(data['Z'][t]) + 
            5.2 * np.cos(data['U'][t]) + 
            0.05 * np.cos(x_mean) + 
            0.1 * np.sin(t * np.pi / 25) + 
            time_varying_noise(0.2, t)()
        )

    return data, treatment_effect

num_timesteps = 5 
num_samples = 100  
generated_data, treatment_effect = generate_time_series_data(num_timesteps, num_samples)

df = pd.DataFrame({var: generated_data[var].flatten() for var in generated_data.keys() if var != 'U'})  # U는 제외
df['time'] = np.repeat(np.arange(num_timesteps), num_samples)
df['sample'] = np.tile(np.arange(num_samples), num_timesteps)
df['treatment_effect'] = treatment_effect.flatten()

df.to_csv('./syn_ts_data_with_treatment_effect.csv', index=False)