# Data generation of Molenkamp test

Generates data for Molenkamp test. It is a parametric and time dependent problem, data are stored in dimensions [B,T,C,x_1,x_2] where B is batch size, T is number of time steps, C is the channel size, x_1 is the size of first spatial dimension, x_2 is the size of second spatial dimension. Parameters are stored in dimension [B, n_p+1], where B is batch size and n_p is dimension of parameter vector. +1 because last dimension is the dt of the whole time series. Of course this worls only for time series with fixed time steps, otherwise modification is required. One can decide to use the molenkamp test with or without decay.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import torch as tc
import torch.nn.functional as F
import os

# Functions

In [None]:
def molenkeamp(X,Y, param, t):
    h = np.sqrt((X-param[3]+0.5*np.cos(2*np.pi*t))**2+(Y-param[4]+0.5*np.sin(2*np.pi*t))**2)
    
    return param[0]*0.01**(param[1]*h**2)*np.exp(-param[2]*t)

def molenkeamp_no_decay(X,Y, param, t):
    h = np.sqrt((X-param[2]+0.5*np.cos(2*np.pi*t))**2+(Y-param[3]+0.5*np.sin(2*np.pi*t))**2)
    
    return param[0]*0.01**(param[1]*h**2)

def generate_param():
    
    l1 = np.random.rand() * 19 + 1
    l2 = np.random.rand() * 2 + 2
    l3 = np.random.rand() * 4 + 1
    l4 = np.random.rand() * 0.2 - 0.1
    l5 = np.random.rand() * 0.2 - 0.1
    
    return [l1,l2,l3,l4,l5]
    
def generate_param_no_decay():
    
    l1 = np.random.rand() * 19 + 1
    l2 = np.random.rand() * 2 + 2
    l3 = np.random.rand() * 0.2 - 0.1
    l4 = np.random.rand() * 0.2 - 0.1
    
    return [l1,l2,l3,l4]

def get_data(samples, T_grid, grid):

    field_step = []
    parameter = []

    for i in range(samples):
        P = generate_param()
        out = []
        for count, t in enumerate(T_grid):
            out.append([molenkeamp(grid[0], grid[1], P, t)])
        parameter.append(np.concatenate((P, [T_grid[1] - T_grid[0]])))
        field_step.append(out)

    return field_step , parameter


# Get Data

In [None]:
path_data = '../../../../../scratch/aalelonghi/molenkamp_whole_time_series_2/'
samples_training = 1
samples_validation = 1

T = np.arange(0, 1.05, 0.05)
a = np.linspace(-1,1,128)
b = np.linspace(1,-1,128)
X, Y = np.meshgrid(a, b)

field_step_training, parameter_training = get_data(samples_training, T, [X,Y])
field_step_validation, parameter_validation = get_data(samples_validation, T, [X,Y])

In [None]:
print('field_step_training',np.shape(field_step_training))

In [None]:
os.makedirs(path_data, exist_ok=True)
np.save(path_data + '/field_step_training', np.float32(field_step_training), allow_pickle=True, fix_imports=True)
np.save(path_data + '/parameter_training', np.float32(parameter_training), allow_pickle=True, fix_imports=True)

np.save(path_data + '/field_step_validation', np.float32(field_step_validation), allow_pickle=True, fix_imports=True)
np.save(path_data + '/parameter_validation', np.float32(parameter_validation), allow_pickle=True, fix_imports=True)

# Make test data

In [None]:
#generate test

samples_test = 1

T_test = np.arange(0, 1.05, 0.05)
a = np.linspace(-1,1,128)
b = np.linspace(1,-1,128)
X, Y = np.meshgrid(a, b)

initial_condition = []
parameter = []
output = []

for i in range(samples_test):
    P = generate_param_no_decay()
    out = []
    for count, t in enumerate(T_test):
        out.append([molenkeamp_no_decay(X, Y, P, T_test[count])])
    output.append(out)
    parameter.append(P)
    initial_condition.append([molenkeamp_no_decay(X, Y, P, 0)])

    #remember to call the normalized

In [None]:
np.save(path_data + '/testing_initial_conditions', np.float32(initial_condition), allow_pickle=True, fix_imports=True)
np.save(path_data + '/testing_parameter', np.float32(parameter), allow_pickle=True, fix_imports=True)
np.save(path_data + '/testing_T', np.float32(T_test), allow_pickle=True, fix_imports=True)
np.save(path_data + '/testing_output', np.float32(output), allow_pickle=True, fix_imports=True)

# get test ad different time-steps

In [None]:
#generate test

params = np.load('steep/testing_parameter.npy')
samples_test = 100
T_test = np.array([0,0.05,0.1,0.15,0.2,0.22,0.25,0.3,0.35,0.4,0.45,0.5,0.55,0.6,0.65,0.7,0.75,0.8,0.85,0.9,0.95,1])
a = np.linspace(-1,1,128)
b = np.linspace(1,-1,128)
X, Y = np.meshgrid(a, b)

output = []

for i in range(samples_test):
    P = params[i]
    out = []
    for count, t in enumerate(T_test):
        out.append([molenkeamp(X, Y, P, T_test[count])])
    output.append(out)

print(np.shape(output))
    #remember to call the normalized

In [None]:
np.save('steep/testing_output_diff_time', np.float32(output), allow_pickle=True, fix_imports=True)

# Generate data out of distribution

In [None]:
samples_test = 100

T_test = np.arange(0, 1.05, 0.05)
a = np.linspace(-1,1,128)
b = np.linspace(1,-1,128)
X, Y = np.meshgrid(a, b)

initial_condition = []
parameter = []
output = []

for i in range(samples_test):
    P = [np.random.rand()*]
    out = []
    for count, t in enumerate(T_test):
        out.append([molenkeamp_no_decay(X, Y, P, T_test[count])])
    output.append(out)
    parameter.append(P)
    print(P)
    initial_condition.append([molenkeamp_no_decay(X, Y, P, 0)])

print(np.shape(output))

# Random

In [None]:
tr = np.load('field_step_training.npy')
val = np.load('field_step_validation.npy')
print(np.shape(tr))
print(np.shape(val))

In [None]:
tr = np.reshape(tr,(5000*21,1,1,128,128))
val = np.reshape(val,(200*21,1,1,128,128))
print(np.shape(tr))
print(np.shape(val))
np.save('../molenkamp_whole_time_series/image_training', np.float32(tr), allow_pickle=True, fix_imports=True)
np.save('../molenkamp_whole_time_series/image_validation', np.float32(val), allow_pickle=True, fix_imports=True)