In [1]:
import numpy as np
from torch import tensor, sqrt, split, float32, optim, nn, no_grad
import logging
import matplotlib.pyplot as plt
from itertools import chain
from multiprocessing import Pool
from pprint import pprint
import sys

class NonLinearModel:
    def __init__(self, initial_params = [0, 0, 0, 0, 0], device = "cuda"):
        self.device = "cuda"
        self.initial_params = tensor(initial_params, requires_grad=True, dtype=float32).to(device)
        self.fitted_params = None
        pass

    def fit(self, x_train, y_train, maturity):
        if self.device != "cuda":
            logging.info("Device is not GPU. Moving data to GPU.")
        x_train = x_train.to(self.device)
        y_train = y_train.to(self.device)
        optimizer = optim.Adam([self.initial_params], lr=0.1)
        epochs = 100
        loss_fn = nn.MSELoss()
        for epoch in range(epochs):
            optimizer.zero_grad()
            y_pred = self.functional_form(x_train, maturity, self.initial_params)
            loss  = loss_fn(y_pred, y_train)

            #Backpropagation
            loss.backward()
            optimizer.step()
            if (epoch + 1) % 100 == 0:
                print(f"Epoch {epoch + 1}: loss {loss.item():.4f}")

        self.fitted_params = self.initial_params.detach().cpu().numpy()
        print("fitted model")
        return self.fitted_params

    def predict(self, x_test, maturity):
        """
        Predict output for the given input data.
        """
        x_test = x_test.to(self.device)
        with no_grad():
            y_pred = self.functional_form(x_test, maturity, params=self.initial_params)
        return y_pred.cpu()
    
    @staticmethod
    def functional_form(x, maturity, params):
        a, b, rho, m, sigma, *rest = params
        a = tensor(a)
        b = tensor(b)
        rho = tensor(rho)
        m = tensor(m)
        sigma = tensor(sigma)
        x = tensor(x)
        maturity = tensor(maturity)
        return sqrt(a + b * (rho * (x - m) + sqrt((x - m) ** 2 + sigma ** 2)))/sqrt(maturity)



def tweak_params(params, tweak):
    result_params = []
    for i,v in enumerate(params.keys()):
        result_params.append(params[v] + np.random.normal(0,tweak))
    return result_params

def generate_params(usual_params, number_of_maturities = 10):
    #generate artifical data
    maturities = np.linspace(0.0001,10,number_of_maturities)*(1-np.exp(-0.1*np.linspace(0,10,number_of_maturities)))+0.1
    generated_params = [[*tweak_params(usual_params, 0.00001),v] for i,v in enumerate(maturities)] #generate a list of parameters for each maturity
    return generated_params

def generate_data(generated_params):
    x = [np.linspace(-0.20,0.20,10) for i in range(len(generated_params))]
    y = [NonLinearModel.functional_form(x[i], generated_params[i][-1], generated_params[i]) for i in range(len(generated_params))]

    x_flat = list(chain.from_iterable(x))
    y_flat = list(chain.from_iterable(y))
    return tensor(x_flat), tensor(y_flat)

def fit_model(x_train, y_train, maturity):
    model = NonLinearModel()
    model.fit(x_train, y_train, maturity)
    return model.fitted_params

def split_list(a_list, number):
    return [a_list[i:i + number] for i in range(0, len(a_list), number)]

def fit_model_worker(args):
    x, y, params = args
    print(f"Worker received: x={x[:5]}, y={y[:5]}, params={params}")
    sys.stdout.flush()
    model = NonLinearModel(initial_params=params)
    print("Fitting model...")
    result = model.fit(x, y)
    print(f"Worker finished: result={result}")
    sys.stdout.flush()  # Force flushing again
    return result

def main():
    svi_params = {
    "a": 0.04,  # Vertical offset (minimum implied variance)
    "b": 0.1,   # Slope (controls curvature, must be >= 0)
    "rho": -0.3, # Skew/asymmetry (-1 <= rho <= 1)
    "m": 0.0,   # Log-moneyness shift (horizontal position of the smile)
    "sigma": 0.2 # Width/curvature (must be > 0)
    }
    generated_params = generate_params(svi_params, 10)
    generated_data = generate_data(generated_params)
    split_tensors = [split(generated_data[0], 10),split(generated_data[1], 10)]
    x, y = split_tensors[0], split_tensors[1]
    args = [(x[i], y[i], generated_params[i]) for i in range(len(generated_params))]
    #print("Arguments (args):")
    #pprint(args)
    with Pool(1) as p:
        results = p.map(fit_model_worker, args)
    print(results)


In [None]:
if __name__ == "__main__":
    main()