In [None]:
# ALGORYTM Z "Approximation of piecewise Hölder functions from inexact information"

# PARAMETRY ALGORYTMU:
# h = T/m, gdzie m >= 2r + 1   <--- initial resolution of the mesh
# 0 < w < (r + 1) * h

In [None]:
# Kroki algorytmu:
# 1. aproksymacja punktu osobliwego s_f
#   1a. siatka regularna (o długości h) oraz różnice dzielne użyte są do lokalizacji s_f na przedziale [u_1, v_1] (o długości (r+1) * h)
#   1b. zawęzenie przedziałów z 1a do [u_2, v_2] przy użyciu wielomianów ekstrpolacyjnych p_neg i p_pos
#   1c. wyprodukowanie przedziału [u_3, v_3] zawierającego się (lub równego) [u_2, v_2], w którym różnica |p_pos - p_neg| jest nierosnąca na [u_3, ksi] i niemalejąca na [ksi, v_3], gdzie ksi jest finalną aproksymacją s_f
# 2.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import math
from scipy.optimize import fminbound
from scipy import interpolate
import multiprocessing as mp


In [None]:
# EXAMPLE FUNCTION
f1__a = 0
f1__b = 2 * np.pi + 0.5
f1__r = 3
f1__rho = 10e-4

def f1(x):
    def raw_f(x):
        if 0 <= x < np.pi:
            return np.sin(x - np.pi)
        if np.pi <= x <= 2 * np.pi + 0.5:
            return np.sin(x - np.pi - 0.5)

    if isinstance(x, (list, np.ndarray)):
        return [raw_f(elem) for elem in x]
    elif isinstance(x, (float, np.float64)):
        return raw_f(x)
    
    raise Exception("x has to be list or float")

In [None]:
mesh = np.linspace(f1__a, f1__b, 50, dtype='float64')
plt.scatter(mesh, f1(mesh))

In [None]:
# EXAMPLE FUNCTION
f2__a = 0
f2__b = 3 * np.pi
f2__r = 3
f2__rho = 10e-4

def f2(x):
    def raw_f(x):
        if 0 <= x < np.pi:
            return np.sin(x)
        if np.pi <= x <= 3 * np.pi:
            return np.sin(x - np.pi)

    if isinstance(x, (list, np.ndarray)):
        return [raw_f(elem) for elem in x]
    elif isinstance(x, (float, np.float64)):
        return raw_f(x)
    
    raise Exception("x has to be list or float")

In [None]:
mesh = np.linspace(f2__a, f2__b, 50, dtype='float64')
plt.scatter(mesh, f2(mesh))

In [None]:
def devided_diff(t, y, r, i):
    '''
    t : array of data points t_i, ... , t_{i+r+1}
    y : array of y_j = f(t_j) + e_j
    '''
    
    products = []
    for j in range(i, i+r+1):
        product = 1.0
        for k in range(i, i+r+1):
            if k == j:
                continue
            product *= 1 / (t[k] - t[j])
        products.append(y[j] * product)

    products.sort()
    return math.fsum(products)

In [None]:
# Step 1
def step1(t, y, m, r):
    i_max = np.argmax([devided_diff(t, y, r, i) for i in range(0, m-r)])
    u_1 = t[i_max]
    v_1 = t[i_max + r +1]
    return u_1, v_1, i_max

In [None]:
# Step 2
def step2(f, t, y, r, d, u_1, v_1, i_max, noise=None):
    p_neg = interpolate.interp1d(
        t[i_max - r:i_max + 1],
        y[i_max - r:i_max + 1],
        fill_value="extrapolate"
    )
    p_pos = interpolate.interp1d(
        t[i_max + r + 1: i_max + 2 * r + 1 + 1],
        y[i_max + r + 1: i_max + 2 * r + 1 + 1],
        fill_value="extrapolate"
    )

    u = u_1
    v = v_1
    while v - u > d:
        z = [u + j * (v - u) / (r + 2) for j in range(1, r + 2)]
        dif = np.abs(p_pos(z) - p_neg(z))
        j_max = np.argmax(dif)

        if noise is None:
            f_value = f(z[j_max])
        else:
            f_value = f(z[j_max]) + np.random.uniform(-noise, noise)

        if np.abs(f_value - p_neg(z[j_max])) <= np.abs(f_value - p_pos(z[j_max])):
            u = z[j_max]
        else:
            v = z[j_max]
    
    u_2 = u.item()
    v_2 = v.item()
    return u_2, v_2, p_neg, p_pos, z


In [None]:
# Step 3
def step3(f, t, u_2, v_2, p_neg, p_pos, z, noise=None):
    u = u_2
    v = v_2

    def fun(x):
        return -1 * np.abs(p_neg(x) - p_pos(x)) # "-" beacuse we are looking for maximum, not minimum
        
    while True:
        z_max = fminbound(func=fun, x1=u, x2=v)
        
        if np.isclose(z_max, u) or np.isclose(z_max, v): # => no local maximum
            break


        if noise is None:
            f_value = f(z_max)
        else:
            f_value = f(z_max) + np.random.uniform(-noise, noise)

        if np.abs(f_value - p_neg(z_max)) <= np.abs(f_value - p_pos(z_max)): # NOISE HANDLING???
            u = z_max
        else:
            v = z_max

    u_3 = u
    v_3 = v
    
    ksi = fminbound(
        func=lambda x: np.abs(p_neg(x) - p_pos(x)),
        x1=u_3, x2=v_3)
    
    return u_3, v_3, ksi

In [None]:
# =====ALGORITHM=====

def Alg2015(f, f__a, f__b, r, rho, m, noise=None):
    '''
    f - funkcja, którą aproksymujemy
    r, rho - stałe regularności funkcji
    f__a, f__b - aproksymowany przedzial
    m - początkowa gęstość siatki
    noise - None lub wartosc losowego szumu
    '''
    # Step 0 (initialization of algorithm's constants and function evaluations)
    h = (f__b - f__a) / m
    d = h ** (r + rho)  # d = -1.0 * np.random.uniform(-(r + 1) * h, 0) # multiply by -1 because we need interval (0, (r + 1) * h]
    t = np.linspace(f__a, f__b, m+1, dtype='float64') # czyli mamy m+1 wezlów postaci t[0], ... , t[m]
    
    if noise is not None:
        e = np.random.uniform(-noise, noise, m+1)
        y = [f(t[j]) + e[j] for j in range(0, m+1)]
    else:
        y = np.array(f(t))

    # kroki algorytmu
    u_1, v_1, i_max = step1(t, y, m, r)
    u_2, v_2, p_neg, p_pos, z = step2(f, t, y, r, d, u_1, v_1, i_max, noise=noise)
    u_3, v_3, ksi = step3(f, t, u_2, v_2, p_neg, p_pos, z, noise=noise)

    # finalna funkcja approksymująca podaną funkcję
    polynomial1 = interpolate.interp1d(
                t[:i_max + 1],
                y[:i_max + 1])

    polynomial2 = interpolate.interp1d(
                t[i_max + r + 1:],
                y[i_max + r + 1:])

    def final_approximation(x):
        def raw_f(x):
            if t[0] <= x < u_1:
                return polynomial1(x)
            if u_1 <= x < ksi:
                return p_neg(x)
            if ksi <= x < v_1:
                return p_pos(x)
            if v_1 <= x <= t[m]:
                return polynomial2(x)
            print("WATCH OUT!!!")
            return -1

        if isinstance(x, (list, np.ndarray)):
            return [raw_f(elem) for elem in x]
        return raw_f(x)

    return t, final_approximation

In [None]:
# delta = 10e-4
# m = 100
# t, f_approximation = Alg2015(f1, f1__a, f1__b, f1__r, f1__rho, m, delta)

# plt.scatter(t,f_approximation(t), s=1)
# plt.plot(t, f_approximation(t))

In [None]:
import types
import functools

def copy_func(f):
    g = types.FunctionType(f.__code__, f.__globals__, name=f.__name__,
                           argdefs=f.__defaults__,
                           closure=f.__closure__)
    g = functools.update_wrapper(g, f)
    g.__kwdefaults__ = f.__kwdefaults__
    return g

In [None]:
import scipy.integrate as integrate

def worst_case_error(fun, fun_data, algorithm, alg_data, num, p=2):
    '''
    calculate worst case error with respect to noise for function fun
    '''
    f__a, f__b, r, rho = fun_data
    m, noise = alg_data
    
    norms = []
    errors = [] # Errors of intergation

    fun_copy = copy_func(fun)
    algorithm_copy = copy_func(algorithm)

    for i in range(num):
        f_approximation = algorithm_copy(fun_copy, f__a, f__b, r, rho, m, noise)[1]
        result, error = integrate.quad(lambda x: abs(fun(x) - f_approximation(x))**p, f__a, f__b)
        norm = result**(1/p)
        norms.append(norm)
        errors.append(error) 

    result = np.max(norms)
    print("worst case error for {} alg runs with m={} equals: {}".format(num, m, result))

    return result

In [None]:
log10_m_array = np.linspace(1.5, 4.5, num=20)
m_array = np.array(np.power(10, log10_m_array), dtype='int')
deltas = [None, 10e-12, 10e-8, 10e-4]

In [None]:
dataset = [(f1, (f1__a, f1__b, 2, f1__rho), Alg2015,(m.item(), None), 10, 2) for m in np.nditer(m_array)]

with mp.Pool(processes=3) as pool:
    results = pool.starmap(worst_case_error, dataset)

    print(results)

In [None]:
for m in np.nditer(m_array):
    error = worst_case_error(
        fun=f1,
        fun_data=(f1__a, f1__b, 2, f1__rho),
        algorithm=Alg2015,
        alg_data=(m.item(), None),
        num=10,
        p=2)

In [None]:
# errors_for_deltas_f1 = []
# for delta in deltas:
#     errors = []
#     for m in np.nditer(m_array):
#         error = worst_case_error(
#             fun=f1,
#             fun_data=(f1__a, f1__b, 4, f1__rho),
#             algorithm=Alg2015,
#             alg_data=(m, delta),
#             num=1000,
#             p=2)

#         errors.append(error)
#         print("delta: {} ---   m: {} -> error: {:.12f}".format(delta, m, error))
    
#     errors_for_deltas_f1.append(errors)


# errors_for_deltas_f2 = []
# for delta in deltas:
#     errors = []
#     for m in np.nditer(m_array):
#         error = worst_case_error(
#             fun=f2,
#             fun_data=(f2__a, f2__b, 4, f2__rho),
#             algorithm=Alg2015,
#             alg_data=(m, delta),
#             num=1000,
#             p=2)

#         errors.append(error)
#         print("delta: {} ---   m: {} -> error: {:.12f}".format(delta, m, error))
    
#     errors_for_deltas_f2.append(errors)


In [None]:
log10errors_for_deltas_f1 = np.log10(errors_for_deltas_f1)
log10errors_for_deltas_f2 = np.log10(errors_for_deltas_f2)

In [None]:
log10errors_for_deltas_f1[1]

In [None]:
colors = ['g', 'red', 'b', 'orange']
markers = [',', 'x', 'o', '+', '.', 'v', '^', '<', '>', 's', 'd']

for i in range(4):
    plt.scatter(log10_m_array, -log10errors_for_deltas_f1[i], c=colors[i], marker=markers[i])

plt.show()

In [None]:
print(log10_m_array)
print(deltas)
print(log10errors_for_deltas_f1)
print(log10errors_for_deltas_f2)