In [1]:
import random

import numpy as np
import matplotlib.pyplot as plt
import math
import torch
from sklearn.linear_model import Lasso
import plotly.graph_objects as go

#### Construct dataset

In [None]:
class ConstructChebyshev():
    def __init__(self, n_poly, n_levels, n_samples):
        self.n_poly = n_poly
        self.polys = None
        self.n_levels = n_levels
        self.n_samples = n_samples
        self.dataset  = None

    def check_othogonality(self):
        if self.polys is None:
            raise ValueError("Create polys first by calling get_polys() method.")
        else:
            # Ignore the first and last points as they cause inf when calculating weight
            x = np.linspace(0, 1, self.n_levels)[1:-1]
            dx = x[1] - x[0]
            w = (1-x**2)*(-0.5)
            for i in range(self.n_poly):
                for j in range(i+1, self.n_poly):
                    # Each factor is a function. Unlike vectors where we calculate dot products, for functions we compute inner products
                    # <f,g> = \int_{-1}ˆ{1} f(x)*g(x)dx
                    # Chebyshev polynomials are orthogonal wrt the weight function w(x) = 1/sqrt(1-x^2). So compute weighted inner product.
                    result = np.sum(self.polys[:, i] * self.polys[:, j] * w * dx, axis=0)
                    print(f"Inner product of polys {i} and {j}: {round(result, 2)}")
        
    def get_polys(self, plot=False):
        # Create linearly independent factors
        x = np.linspace(0, 1, self.n_levels + 2)[1:-1]
        # # Method 1 (Does not normalize the factors)
        # for i in range(self.n_factors):
        #     factor = x**i
        #     if i == 0:
        #         factors = factor.reshape(-1, 1)
        #     else:
        #         for j in range(i-1, -1, -1):
        #             factor -= quad(lambda x: (x**i)*(x**j)*(1-x**2)**(-0.5), -1, 1)[0]/quad(lambda x: (x**j)*(x**j)*(1-x**2)**(-0.5), -1, 1)[0] * factors[:, j]
        #         factors = np.hstack((factors, factor.reshape(-1, 1)))
        # Method 2 (Recurrence relation)
        for i in range(self.n_poly + 1):
            if i == 0:
                polys = np.ones(self.n_levels, dtype=np.float32).reshape(-1, 1)
            elif i == 1:
                polys = np.hstack((polys, x.reshape(-1, 1)), dtype=np.float32)
            else:
                poly = 2*x*polys[:, i-1] - polys[:, i-2]
                polys = np.hstack((polys, poly.reshape(-1, 1)), dtype=np.float32)
        # Drop first factor since it is just an array of ones
        self.polys = polys[:, 1:]
        if plot:
            # 0, 1, 2, 3, 4, 5, 6, 7, 8
            descriptor_set_1 = [7, 2, 3]
            descriptor_set_2 = [6, 0, 4]
            descriptor_set_3 = [8, 1, 5]
            for desc in descriptor_set_1:
                plt.plot(x, self.polys[:, desc], '.-', label=f'Poly {desc}')
            plt.title('Descriptor Set 1')
            plt.xlabel('x')
            plt.ylabel('t(x)')
            plt.legend()
            plt.show()
            for desc in descriptor_set_2:
                plt.plot(x, self.polys[:, desc], '.-', label=f'Poly {desc}')
            plt.title('Descriptor Set 2')
            plt.xlabel('x')
            plt.ylabel('t(x)')
            plt.legend()
            plt.show()
            for desc in descriptor_set_3:
                plt.plot(x, self.polys[:, desc], '.-', label=f'Poly {desc}')
            plt.title('Descriptor Set 3')
            plt.xlabel('x')
            plt.ylabel('t(x)')
            plt.legend()
            plt.show()
        return self.polys
    
    def meshgrid(self):
        if self.polys is None:
            raise ValueError("Run get_polys() method first")
        else:
            for i in range(self.n_poly):
                samples = np.array([[level]*(self.n_levels**i) \
                                        for level in self.polys[:, i]]*(self.n_levels**(self.n_poly - i - 1))).reshape(-1, 1)
                if i == 0:
                    dataset = samples
                else:
                    dataset = np.hstack((dataset, samples))
        self.dataset = dataset
        return self.dataset
            
    def gaussian_pmf(self, x , mu, sigma):
        return (1/(sigma * math.sqrt(2 * math.pi))) * math.exp(-((x - mu)**2)/(2 * sigma**2)) 
    
    def sample(self, dist='uniform'):
        if self.polys is None:
            raise ValueError("Run get_polys() method first")
        else:
            np.random.seed(seed)
            random.seed(seed)
            torch.manual_seed(seed)
            if dist == 'uniform':
                p = [1/self.n_levels]*self.n_levels
            elif dist == 'normal':
                 # Note : Here the gaussiana mixture probabilities kept same for each polynomials
                 p = []
                 means = [0.2, 0.6]
                 sigmas = [2, 1.5]
                 wts = [0.8, 0.2]
                 for x in self.polys[:, 0]:
                     total_prob = 0
                     for mean, sigma, wt in zip(means, sigmas, wts):
                        prob = self.gaussian_pmf(x, mean, sigma)
                        total_prob += wt * prob
                     p.append(total_prob)
            else:
                raise ValueError("Invalid distribution type. Use 'uniform' or 'normal'.")
            for i in range(self.n_poly):
                samples = np.random.choice(self.polys[:, i], size=self.n_samples, replace=True, p=p).reshape(-1, 1)
                if i == 0:
                    dataset = samples
                else:
                    dataset = np.hstack((dataset, samples))
            # Remove duplicates
            dataset = np.unique(dataset, axis=0)
            self.dataset = dataset
        return dataset

# # num_samples = [int((0.00000001/100)*(8**20)), int((0.0000001/100)**(8**20)), int((0.000001/100)**(8**20))]
# num_samples = [1000, 10000, 100000]
# create_orthogonal_polynomials = ConstructChebyshev(9, 9, num_samples[0])
# create_orthogonal_polynomials.get_polys(plot=True)
# # create_orthogonal_polynomials.check_othogonality()
# # # create_orthogonal_polynomials.meshgrid().shape
# create_orthogonal_polynomials.sample().shape
# dataset = create_orthogonal_polynomials.dataset

num_desc_scale_1 = 8
num_targets_scale_1 = 1
num_desc_scale_2 = 5
num_targets_scale_2 = 1
num_desc_scale_3 = 2
num_targets_scale_3 = 1
num_samples = 1000 
noise_scale = 1.0
noise_label = '1'
seed_for_noise_variables = 42
seed_for_features = 100
round_off = 3

np.random.seed(seed_for_noise_variables)
random.seed(seed_for_noise_variables)
torch.manual_seed(seed_for_noise_variables)
zetas = np.round(np.random.normal(loc=0, scale=noise_scale, size=(num_samples, num_desc_scale_1 + num_targets_scale_1 + num_desc_scale_2 + num_targets_scale_2 + num_desc_scale_3 + num_targets_scale_3)), round_off)
print(f'Zetas shape : {zetas.shape}')
print(f'Zetas head : \n')
print(zetas[0:10, :])
zetas_scale_1 = zetas[:, :num_desc_scale_1 + num_targets_scale_1]
zetas_scale_2 = zetas[:, num_desc_scale_1 + num_targets_scale_1:num_desc_scale_1 + num_targets_scale_1 + num_desc_scale_2 + num_targets_scale_2]
zetas_scale_3 = zetas[:, num_desc_scale_1 + num_targets_scale_1 + num_desc_scale_2 + num_targets_scale_2:]
print(zetas_scale_1.shape, zetas_scale_2.shape, zetas_scale_3.shape)
print('\n')

np.random.seed(seed_for_features)
random.seed(seed_for_features)
torch.manual_seed(seed_for_features)
dataset_scale_1 = np.round(np.random.uniform(low=-1, high=1, size=(num_samples, num_desc_scale_1)), round_off)
print(f'dataset_scale_1 shape : {dataset_scale_1.shape}')
print(f'dataset_scale_1 head : \n')
print(dataset_scale_1[0:10, :])

#### Add the dependent variables to the dataset

In [None]:
save = True
plot_scale_2 = True
plot_scale_3 = True

# Create 3D surface plots
def plot_surface(data, plot_title, axis_labels):
    # Create a 3D surface plot using plotly
    fig = go.Figure(data=[go.Scatter3d(
        x=data[:, 0],
        y=data[:, 1],
        z=data[:, 2],
        mode='markers',
        marker=dict(
            size=2,
            color=data[:, -1],  # Set color to the third column
            colorscale='Viridis',  # Choose a colorscale
            opacity=0.8,
            line=dict(width=0)
        )
    )])
    fig.update_layout(title=plot_title, autosize=False,
                        scene=dict(
                            xaxis_title=axis_labels[0],
                            yaxis_title=axis_labels[1],
                            zaxis_title=axis_labels[2],
                            aspectmode='cube'
                        ),
                        margin=dict(l=0, r=0, b=0, t=0),
                        width=400,
                        height=400)
    fig.show()
    
y0_1 = (dataset_scale_1[:, 0] + zetas_scale_1[:, 0])**2 + (dataset_scale_1[:, 7] + zetas_scale_1[:, 7])**2
dataset_scale_1 = np.hstack((dataset_scale_1, y0_1.reshape(-1, 1)))
print(f'dataset_scale_1 shape : {dataset_scale_1.shape}')
if save:
    np.save(f'dataset_scale_1_{num_samples}_{noise_label}.npy', dataset_scale_1)
    np.save(f'zetas_scale_1_{num_samples}_{noise_label}.npy', zetas_scale_1)

# Linear dependent variables with noise
x0_2 = 2*(dataset_scale_1[:, 0] + zetas_scale_1[:, 0]) + 3*(dataset_scale_1[:, 1] + zetas_scale_1[:, 1])
if plot_scale_2:
    data = np.hstack((dataset_scale_1[:, 0].reshape(-1, 1), 
                        dataset_scale_1[:, 1].reshape(-1, 1), 
                        x0_2.reshape(-1, 1)))
    plot_surface(data, 'x0_2', ['x0_1', 'x1_1', 'x0_2'])

x1_2 = (dataset_scale_1[:, 0] + zetas_scale_1[:, 0]) + (dataset_scale_1[:, 2] + zetas_scale_1[:, 2])
if plot_scale_2:
    data = np.hstack((dataset_scale_1[:, 0].reshape(-1, 1), 
                        dataset_scale_1[:, 2].reshape(-1, 1), 
                        x1_2.reshape(-1, 1)))
    plot_surface(data, 'x1_2', ['x0_1', 'x2_1', 'x1_2'])

# Non Linear dependent variables with noise (upto degree 2)
x2_2 = (dataset_scale_1[:, 0] + zetas_scale_1[:, 0])**2 
if plot_scale_2:
    plt.plot(dataset_scale_1[:, 0], x2_2, '.')
    plt.xlabel('x0_1')
    plt.ylabel('x2_2')
    plt.show()

x3_2 = (dataset_scale_1[:, 1] + zetas_scale_1[:, 1])**2
if plot_scale_2:
    plt.plot(dataset_scale_1[:, 1], x3_2, '.')
    plt.xlabel('x1_1')
    plt.ylabel('x3_2')
    plt.show()

x4_2 = (dataset_scale_1[:, 1] + zetas_scale_1[:, 1])*(dataset_scale_1[:, 2] + zetas_scale_1[:, 2])
if plot_scale_2:
    data = np.hstack((dataset_scale_1[:, 1].reshape(-1, 1), 
                        dataset_scale_1[:, 2].reshape(-1, 1), 
                        x4_2.reshape(-1, 1)))
    plot_surface(data, 'x4_2', ['x1_1', 'x2_1', 'x4_2'])

dataset_scale_2 = np.hstack((x0_2.reshape(-1, 1),
                             x1_2.reshape(-1, 1),
                             x2_2.reshape(-1, 1),
                             x3_2.reshape(-1, 1),
                             x4_2.reshape(-1, 1)))

y0_2 = (dataset_scale_1[:, -1] + zetas_scale_1[:, -1])**2 + (dataset_scale_2[:, 4] + zetas_scale_2[:, 4])**4
if plot_scale_2:
    data = np.hstack((dataset_scale_1[:, -1].reshape(-1, 1),
                      dataset_scale_2[:, 4].reshape(-1, 1),
                      y0_2.reshape(-1, 1)))
    plot_surface(data, 'y0_2', ['y0_1', 'x4_2', 'y0_2'])

dataset_scale_2 = np.hstack((dataset_scale_2, y0_2.reshape(-1, 1)))
print(f'dataset_scale_2 shape : {dataset_scale_2.shape}')

if save:
    np.save(f'dataset_scale_2_{num_samples}_{noise_label}.npy', dataset_scale_2)
    np.save(f'zetas_scale_2_{num_samples}_{noise_label}.npy', zetas_scale_2)

x0_3 = (dataset_scale_2[:, 0] + zetas_scale_2[:, 0]) - (dataset_scale_2[:, 4] + zetas_scale_2[:, 4])
if plot_scale_3:
    data = np.hstack((dataset_scale_2[:, 0].reshape(-1, 1), 
                      dataset_scale_2[:, 4].reshape(-1, 1), 
                      x0_3.reshape(-1, 1)))
    plot_surface(data, 'x0_3', ['x0_2', 'x4_2', 'x0_3'])

# Non Linear dependent variables with noise (upto degree 2)
x1_3 = (dataset_scale_2[:, 0] + zetas_scale_2[:, 0])*(dataset_scale_2[:, 4] + zetas_scale_2[:, 4])
if plot_scale_3:
    data = np.hstack((dataset_scale_2[:, 0].reshape(-1, 1), 
                      dataset_scale_2[:, 4].reshape(-1, 1), 
                      x1_3.reshape(-1, 1)))
    plot_surface(data, 'x1_3', ['x0_2', 'x4_2', 'x1_3'])

dataset_scale_3 = np.hstack((x0_3.reshape(-1, 1),  
                             x1_3.reshape(-1, 1)))

y0_3 = (dataset_scale_2[:, -1] + zetas_scale_2[:, -1])**2 + (dataset_scale_3[:, 0] + zetas_scale_3[:, 0])**4
if plot_scale_3:
    data = np.hstack((dataset_scale_2[:, -1].reshape(-1, 1),
                      dataset_scale_3[:, 0].reshape(-1, 1),
                        y0_3.reshape(-1, 1)))
    plot_surface(data, 'y0_3', ['y0_2', 'x0_3', 'y0_3'])

dataset_scale_3 = np.hstack((dataset_scale_3, y0_3.reshape(-1, 1)))
print(f'dataset_scale_3 shape : {dataset_scale_3.shape}')

if save:
    np.save(f'dataset_scale_3_{num_samples}_{noise_label}.npy', dataset_scale_3)
    np.save(f'zetas_scale_3_{num_samples}_{noise_label}.npy', zetas_scale_3)

#### Analyze feature distribution

In [None]:
# Plot the distribution of the samples
for i in range(create_orthogonal_polynomials.n_poly + 2):
    unique, counts = np.unique(linear_dataset[:, i], return_counts=True)
    plt.bar(unique, counts, width=0.05)
    if i != 8 or i != 9:
        plt.xticks(unique, rotation=90)
    plt.xlabel('Value')
    plt.ylabel('Frequency') 
    plt.title('Distribution of Samples for Polynomial ' + str(i))
    plt.show()


#### Train XGBoost

In [None]:
##########################
# Data params
val_split = 0.2
seeds = [0, 1, 2, 3, 4]
# Training params
num_epochs = 2000
# Optimization loss params
l1_coeff = 1e-3
##########################

model_coeffs = []
model_intercepts = []
train_losses = []
val_losses = []

for seed in seeds:

    np.random.seed(seed)
    random.seed(seed)

    # # RANDOM USED HERE - Shuffling to create train and val via cross-validation
    numpy_dataset = np.load('linear_noise_0_1_size_100000.npy')
    data_idxs = np.arange(0, len(numpy_dataset))
    np.random.shuffle(data_idxs)
    print(f'First 10 data idxs : {data_idxs[:10]}')
    train_numpy_idxs = data_idxs[:int((1-val_split)*len(numpy_dataset))]
    val_numpy_idxs = data_idxs[int((1-val_split)*len(numpy_dataset)):]
    train_numpy_dataset = numpy_dataset[train_numpy_idxs, :]
    val_numpy_dataset = numpy_dataset[val_numpy_idxs, :]

    train_numpy_dataset_std = (train_numpy_dataset - np.mean(train_numpy_dataset, axis=0))/np.std(train_numpy_dataset, axis=0)
    val_numpy_dataset_std = (val_numpy_dataset - np.mean(val_numpy_dataset, axis=0))/np.std(val_numpy_dataset, axis=0)

    # Create Lasso model
    lasso = Lasso(alpha=l1_coeff, fit_intercept=True, max_iter=num_epochs, random_state=seed)
    # Fit the model
    lasso.fit(train_numpy_dataset_std[:, :-3], train_numpy_dataset_std[:, -1])
    # Train losses 
    train_preds = lasso.predict(train_numpy_dataset_std[:, :-3])
    train_loss = np.mean((train_numpy_dataset_std[:, -1] - train_preds))
    train_losses.append(train_loss)
    # Get the coefficients
    model_coeffs.append(lasso.coef_)
    model_intercepts.append(lasso.intercept_)

    # Test the model on validation set data
    val_preds = lasso.predict(val_numpy_dataset_std[:, :-3])
    val_loss = np.mean((val_numpy_dataset_std[:, -1] - val_preds))
    val_losses.append(val_loss)
    print(f"Train loss: {train_loss}, Val loss: {val_loss}")


In [None]:
model_wts_arr = np.abs(np.round(np.array(model_coeffs), 2))

print(model_wts_arr)

feature_nums = np.arange(0, 12, 1) # feature
feature_labels = ['t0', 't1', 't2', 't3', 't4', 't5', 't6', 't7', 't8', 't9', 't10', 't11']
seed_nums = np.arange(4, -1, -1) # seed
fig = plt.figure(figsize=(10, 10))
ax = fig.add_subplot(projection='3d')
colors = ['r', 'g', 'b', 'y', 'orange']
for c, seed_num in zip(colors, seed_nums):
    cs = [c]*len(feature_nums)
    ax.bar(feature_nums, model_wts_arr[seed_num, :], zs=seed_num, zdir='y', color=cs, alpha=0.8)
    ax.set_xticks(feature_nums, feature_labels)
    # Add weights to the top of the bars
    for feature_num, height in zip(feature_nums, model_wts_arr[seed_num, :]):
        ax.text(feature_num, seed_num, height, f'{height:.2f}', color='black', ha='center', va='bottom')



In [None]:
print(f'Train loss mean : {np.mean(train_losses)} , std : {np.std(train_losses)}')
print(f'Val loss mean : {np.mean(val_losses)} , std : {np.std(val_losses)}')

#### Train XGBoost