# Testing neural networks approach to numerical integration on Genz numerical integration testing package

## Library import

In [None]:
%run ../skuld/skuld.py      # skuld NNI library
%run ../utils/plots.py      # plotting functions
%run ../utils/integrate.py  # 'classic' numerical integration function

# 1D testing

### 1D testing function

In [None]:
def test_1d_integration(h, func, func_float, a, b, func_name):
    u: float = random.uniform(0, 1)
    cs = random.uniform(0, 1)
    c = h
  
    X_init, y_init = generate_data(func, a, b, 40000, 1, u, c)
    plot_1d_function(X_init, y_init, func_name)
    X, y = scale_data(X_init, y_init)   

    x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.1)  
    model = MLP(input_size, hidden_size)
    model.compile(criterion=nn.MSELoss(), optimizer=optim.Adam(model.parameters(), lr=learning_rate))
    train_history = model.fit(x_train, y_train, num_epochs, verbose=False)
    test_loss = model.test(x_test, y_test)
    print(f"Test Loss: {test_loss:.10f}")
    
    nni_scaled = NeuralNumericalIntegration.integrate(model, a, b, n_dims=1)
    nni_result = descale_result(float(nni_scaled[0]), X_init, y_init, frange=(0, 1), n_dim=1)
    result_quad = integrate_1d_quad(func_float, a[0], b[0], u, c)
    result_trapz = np.trapz(y_init.squeeze().numpy(), X_init.squeeze().numpy())

    return nni_result, result_quad, result_trapz

## Hyperparams for 1D

In [None]:
input_size = 1
hidden_size = 25
learning_rate = 0.001
num_epochs = 5000

tests_num = 20

a_ = [0.0]
b_ = [1.0]

## 1. Oscillatory 1D

In [None]:
func_name = 'Oscillatory 1D'
h = 100
results = []

def osc_1d(X, u, c):
    return torch.cos(2 * math.pi * u + X * c)

def osc_1d_float(X, u, c):
    return math.cos(2 * math.pi * u + X * c)

for i in range(tests_num):
    print(f"\033[1mIteration {i+1} is running!\033[0m")
    results.append(test_1d_integration(h, osc_1d, osc_1d_float, a_, b_, func_name))

plot_test_results(results)

## 2. Product peak 1D

In [None]:
func_name = 'Product peak 1D'
h = 150
results2 = []

def prod_peak_1d(X, u, c):
    return 1 / (c**(-2) + (X - u)**2)

def prod_peak_1d_float(X, u, c):
    return 1 / (c**(-2) + (X - u)**2)

for i in range(tests_num):
    print(f"\033[1mIteration {i+1} is running!\033[0m")
    results2.append(test_1d_integration(h, prod_peak_1d, prod_peak_1d_float, a_, b_, func_name))

plot_test_results(results2)

**NOTE** Absolute error is bad because the values are scaled upwards approximately 100 times.

## 3. Corner peak 1D

In [None]:
func_name = 'Corner peak 1D'
h = 600
results3 = []

def corn_peek_1d(X, u, c):
        return (1 + c * X) ** (-2)

def corn_peek_float(X, u, c):
        return (1 + c * X) ** (-2)

for i in range(tests_num):
    print(f"\033[1mIteration {i+1} is running!\033[0m")
    results3.append(test_1d_integration(h, corn_peek_1d, corn_peek_float, a_, b_, func_name))

plot_test_results(results3)

## 4. Gaussian 1D

In [None]:
func_name = 'Gaussian 1D'
h = 150
results4 = []

def gauss_1d(X, u, c):
    return torch.exp(- c**2 * (X - u)**2)

def gauss_1d_float(X, u, c):
    return np.exp(- c**2 * (X - u)**2)

for i in range(tests_num):
    print(f"\033[1mIteration {i+1} is running!\033[0m")
    results4.append(test_1d_integration(h, gauss_1d, gauss_1d_float, a_, b_, func_name))

plot_test_results(results4)

## 5. Continuous 1D

In [None]:
func_name = 'Continuous 1D'
h = 100
results5 = []

def cont_1d(X, u, c):
    return torch.exp(- c * abs(X - u))

def cont_1d_float(X, u, c):
    return np.exp(- c * abs(X - u))

for i in range(tests_num):
    print(f"\033[1mIteration {i+1} is running!\033[0m")
    results5.append(test_1d_integration(h, cont_1d, cont_1d_float, a_, b_, func_name))

plot_test_results(results5)

## 6. Discontinuous 1D

In [None]:
func_name = 'Discontinuos 1D'
h = 16
results6 = []

def disk_1d(X, u, c):
    y_list = []

    for x in X:
        if x > u:
            y_list.append(0)
        else:
            y_list.append(math.exp(c * x))
        
    return torch.tensor(y_list)

def disk_1d_float(X, u, c):
    if X > u:
        return 0
    else:
        return math.exp(c * X)

for i in range(tests_num):
    print(f"\033[1mIteration {i+1} is running!\033[0m")
    results6.append(test_1d_integration(h, disk_1d, disk_1d_float, a_, b_, func_name))

plot_test_results(results6)

# 2D Testing

## 2D testing function

In [None]:
def test_2d_integration(h, ej, func, func_float, a, b, func_name, logplot=False):
    u = [random.uniform(0, 1) for _ in range(input_size)]
    cs = [random.uniform(0, 1) for _ in range(input_size)]
    fraction = h / (input_size ** ej * sum(cs))
    c = [fraction * value for value in cs]
  
    X_init, y_init = generate_data(func, a, b, 200, input_size, u, c)
    if logplot:
        plot_2d_function_heatmap_with_log(X_init, y_init, func_name)
    else:
        plot_2d_function_heatmap(X_init, y_init, func_name)
    X, y = scale_data(X_init, y_init, n_dim=2)
    
    x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.1)  
    model = MLP(input_size, hidden_size)
    model.compile(criterion=nn.MSELoss(), optimizer=optim.Adam(model.parameters(), lr=learning_rate))
    train_history = model.fit(x_train, y_train, num_epochs, verbose=False)
    test_loss = model.test(x_test, y_test)
    print(f"Test Loss: {test_loss:.10f}")
    
    nni_scaled = NeuralNumericalIntegration.integrate(model, a, b, n_dims=2)
    nni_result = descale_result(float(nni_scaled[0]), X_init, y_init, frange=(0, 1), n_dim=2)
    result_quad = integrate_2d_nquad(func_float, a, b, u, c)
    result_trapz = integrate_2d_trapz(func, a, b, 100, u, c)
   
    return nni_result, result_quad, result_trapz

## Hyperparams for 2D

In [None]:
input_size = 2
hidden_size = 25
learning_rate = 0.001
num_epochs = 5000

tests_num = 20

a_ = [0.0, 0.0]
b_ = [1.0, 1.0]

## 7. Oscillatory 2D

In [None]:
func_name = 'Oscillatory 2D'
h = 100
ej = 1
results7 = []

def osc_2d(X, u, c):
    sum_ = 0
    for i in range(input_size):
        sum_ += c[i] * X[:, i]

    return torch.cos(2 * math.pi * u[0] + sum_)
    
def osc_2d_float(X, u, c):
    sum_ = 0
    for i in range(input_size):
        sum_ += c[i] * X[i]
    return math.cos(2 * math.pi * u[0] + sum_)

for i in range(tests_num):
    print(f"\033[1mIteration {i+1} is running!\033[0m")
    results7.append(test_2d_integration(h, ej, osc_2d, osc_2d_float, a_, b_, func_name))

plot_test_results(results7)

## 8. Product Peak 2D

In [None]:
func_name = 'Product Peak 2D'
h = 150
ej = 1
results8 = []

def prod_peek_2d(X, u, c):
    prod_ = 1
    for i in range(2):
        prod_ *= (c[i] ** (-2) + (X[:, i] - u[i])**2) ** (-1)
    return prod_

def prod_peek_2d_float(X, u, c):
    prod_ = 1
    for i in range(2):
        prod_ *= (c[i] ** (-2) + (X[i] - u[i])**2) ** (-1)
    return prod_

for i in range(tests_num):
    print(f"\033[1mIteration {i+1} is running!\033[0m")
    results8.append(test_2d_integration(h, ej, prod_peek_2d, prod_peek_2d_float, a_, b_, func_name, logplot=False))

plot_test_results(results8)

## 9. Corner Peak 2D

In [None]:
func_name = 'Corner Peak 2D'
h = 600
ej = 1
results9 = []

def corn_peek_2d(X, u, c):
    sum_ = 0
    for i in range(2):
        sum_ += c[i] * X[:, i]
    return (1 + sum_) ** (-3)

def corn_peek_2d_float(X, u, c):
    sum_ = 0
    for i in range(2):
        sum_ += c[i] * X[i]
    return (1 + sum_) ** (-3)

for i in range(tests_num):
    print(f"\033[1mIteration {i+1} is running!\033[0m")
    results9.append(test_2d_integration(h, ej, corn_peek_2d, corn_peek_2d_float, a_, b_, func_name, logplot=True))

plot_test_results(results9)

## 10. Gaussian 2D

In [None]:
func_name = 'Gaussian 2D'
h = 150
ej = 1
results10 = []

def gauss_2d(X, u, c):
    sum_ = 0
    for i in range(2):
        sum_ += (c[i] ** 2) * ((X[:, i] - u[i]) ** 2)
    return torch.exp(-sum_)

def gauss_2d_float(X, u, c):
    sum_ = 0
    for i in range(2):
        sum_ += (c[i] ** 2) * ((X[i] - u[i]) ** 2)
    return np.exp(-sum_)

for i in range(tests_num):
    print(f"\033[1mIteration {i+1} is running!\033[0m")
    results10.append(test_2d_integration(h, ej, gauss_2d, gauss_2d_float, a_, b_, func_name, logplot=True))

plot_test_results(results10)

## 11. Continuous 2D

In [None]:
func_name = 'Continuous 2D'
h = 100
ej = 1
results11 = []

def cont_2d(X, u, c):
    sum_ = 0
    for i in range(2):
        sum_ += c[i]  * abs(X[:, i] - u[i])
    return torch.exp(-sum_)

def cont_2d_float(X, u, c):
    sum_ = 0
    for i in range(2):
        sum_ += c[i] ** 2 * abs(X[i] - u[i]) ** 2
    return np.exp(-sum_)

for i in range(tests_num):
    print(f"\033[1mIteration {i+1} is running!\033[0m")
    results11.append(test_2d_integration(h, ej, cont_2d, cont_2d_float, a_, b_, func_name, logplot=False))

plot_test_results(results11)

## 12. Discontinuous 2D

In [None]:
func_name = 'Discontinuous 2D'
h = 16
ej = 1
results12 = []

def disco_2d(X, u, c):
    result = torch.zeros(X.shape[0])
    for i in range(X.shape[0]):
        x = X[i, 0].item()
        
        if x > u[0]:
            result[i] = 0.0
        else:
            result[i] = math.exp(c[0] * x)

    return result

def disco_1d_float(X, u, c):
    x1, x2 = X

    if x1 > u[0] or x2 > u[1]:
        return 0.0
    else:
        return math.exp(c[0] * x1 + c[1] * x2)

for i in range(tests_num):
    print(f"\033[1mIteration {i+1} is running!\033[0m")
    results12.append(test_2d_integration(h, ej, disco_2d, disco_1d_float, a_, b_, func_name, logplot=False))

plot_test_results(results12)

# Calculation of MAEs for the research

In [None]:
def calculate_mae(results):
    absolute_errors = [abs(a - b) for a, b, _ in results]
    mae = sum(absolute_errors) / len(absolute_errors)
    
    return mae

In [None]:
results_list = [results, results2, results3, results4, results5, results6, results7, results8, results9, results10, results11, results12]
maes = [calculate_mae(results) for results in results_list]

In [None]:
for i, mae in enumerate(maes, start=1):
    print(f"{mae:.8f}")