# Dataloader

In [1]:
from pathlib import Path
import numpy as np
from numpy import random
import torch
from scipy.sparse import diags
from scipy.sparse.linalg import spsolve
import matplotlib.pyplot as plt


import logging
logger = logging.getLogger(__name__)

In [2]:
def _start_logging():
    logging.basicConfig(filename='tests.log', filemode='w', level=logging.INFO, force=True,
                        format='%(asctime)s %(levelname)s %(module)s.%(funcName)s - %(message)s')
    logger.info('Started logging')

    # logger.debug('This is a debug message')
    # logger.info('This is an info message')
    # logger.warning('This is a warning message')
    # logger.error('This is an error message')
    # logger.critical('This is a critical message')
_start_logging()

# Plots

In [3]:
def plot_histograms(arrays, bins=30, figsize=(6, 1), max_height=20, title=None):
    """
    Plot histograms for a list of numpy arrays in a single column.

    Parameters:
    arrays (list of np.array): List of numpy arrays to plot.
    bins (int): Number of bins for the histograms.
    figsize (tuple): Size of the figure. The height is multiplied
        by the number of arrays to plot.
    max_height (int): Maximum height of the figure in inches.
    """
    num_arrays = len(arrays)
    figsize = (figsize[0], min(figsize[1]*(1+num_arrays), max_height))
    _, axes = plt.subplots(num_arrays, 1, figsize=figsize, sharex=True)

    if num_arrays == 1:
        axes = [axes]

    for i, data in enumerate(arrays):
        axes[i].hist(data, bins=bins, edgecolor='black')
        axes[i].set_ylabel('Freq.')

    if title is not None:
        axes[0].set_title(title)

    axes[-1].set_xlabel('Value')
    plt.tight_layout()
    plt.show()

# Sample Generation

In [4]:
def _get_random_scale(uniform_range=(1.,1.)):
    rng = np.random.default_rng()
    low = uniform_range[0]
    high = uniform_range[1]
    return rng.uniform(low=low, high=high, size=1)

def number_generator(size=1, uniform_range=(1.,1.), log_=False):
    """
    Generates random numbers with a probabilistic range.

    The mean of the random numbers is drawn from a uniform distribution
    between -1 and 1. The range of the random numbers is drawn from a
    uniform distribution. The range of the range is given by the
    `uniform_range` parameter.

    Parameters:
    size (int): Number of random numbers to generate.
    uniform_range (tuple): Limits of the range for the random numbers.
        Draws from a uniform distribution.
    log_ (bool): If True, writes results to log file.
    """
    rng = np.random.default_rng()
    range_ = _get_random_scale(uniform_range)
    mean = rng.uniform(low=-1, high=1, size=1)
    if log_:
        logger.info(f'mean: {mean[0]:.4g}')
        logger.info(f'range: {range_[0]:.4g}')
    return rng.uniform(low=mean-0.5*range_, high=mean+0.5*range_, size=size)

# data_ = []
# for _ in range(10):
#     data_.append(number_generator(1000, uniform_range=(0.1,10.), log_=True))
# plot_histograms(data_, bins=30)

In [5]:
def _get_diagonals(number_generator,
                  rank=5,
                  off_diagonal_abs_mean=0.5,
                  symmetric=False,
                  uniform_range=(0.1,10.),
                  log_=False):

    if rank < 3:
        raise ValueError("Rank must be at least 3")

    n_diag_1 = rank-1
    n_diag_2 = rank-2

    n_ = 1 if symmetric else 2
    data = number_generator(n_*(n_diag_1 + n_diag_2), uniform_range=uniform_range, log_=log_)

    mean_abs = np.abs(data).mean()
    alpha = off_diagonal_abs_mean / mean_abs
    data = data * alpha

    if log_:
        logger.info(f'Abs mean: {mean_abs:.4g}')
        logger.info(f'Alpha: {alpha:.4g}')
        logger.info(f'New abs mean: {np.abs(data).mean():.4g}')

    diag_1_u = data[:n_diag_1]
    diag_2_u = data[n_diag_1: n_diag_1+n_diag_2]

    diag_1_l = data[(n_-1)*(n_diag_1+n_diag_2): n_*n_diag_1+n_diag_2]
    diag_2_l = data[(n_-1)*(n_diag_1+n_diag_2) + n_diag_1: n_*(n_diag_1+n_diag_2)]

    return [diag_2_u, diag_1_u, diag_1_l, diag_2_l]

# data = []
# for _ in range(10):
#     diag_ = _get_diagonals(number_generator, rank=50, off_diagonal_abs_mean=5.0, symmetric=False, uniform_range=(0.1,10.), log_=True)
#     data.append(np.concat(diag_))
# plot_histograms(data, bins=30)

In [6]:
def _matrix_from_diags(data):
    diag_2_u, diag_1_u, diag_1_l, diag_2_l = data
    n_diag = diag_1_u.shape[0] + 1
    main_diag = np.ones(n_diag)
    data_ = [diag_2_l, diag_1_l, main_diag, diag_1_u, diag_2_u]
    offsets = [-2, -1, 0, 1, 2]
    return diags(data_, offsets, format='csr')

diag_ = _get_diagonals(number_generator, rank=5, off_diagonal_abs_mean=5.0, symmetric=False, uniform_range=(0.1,10.), log_=False)
data_ = _matrix_from_diags(diag_)
print('Matrix')
print(data_.toarray())

print('Original diagonals')
for s,d in zip(['diag_2_u', 'diag_1_u', 'diag_1_l', 'diag_2_l'], diag_):
    print('  ', s)
    print('    ', d)

Matrix
[[ 1.         -6.53188583 -0.14358835  0.          0.        ]
 [ 2.18779298  1.          2.28053099 -3.03022066  0.        ]
 [ 3.31797208  3.69730078  1.          8.43201239 10.20771814]
 [ 0.          5.10410751 -2.930794    1.         -5.37440119]
 [ 0.          0.         11.70286736 -5.05880774  1.        ]]
Original diagonals
   diag_2_u
     [-0.14358835 -3.03022066 10.20771814]
   diag_1_u
     [-6.53188583  2.28053099  8.43201239 -5.37440119]
   diag_1_l
     [ 2.18779298  3.69730078 -2.930794   -5.05880774]
   diag_2_l
     [ 3.31797208  5.10410751 11.70286736]


In [None]:
def get_sample(number_generator,
               rank=5,
               off_diagonal_abs_mean=0.5,
               symmetric=False,
               uniform_range=(0.1,10.),
               log_=False):
    """Get a linear system sample."""

    diag_ = _get_diagonals(
        number_generator,
        rank=rank,
        off_diagonal_abs_mean=off_diagonal_abs_mean,
        symmetric=symmetric,
        uniform_range=uniform_range,
        log_=log_)
    matrix_a = _matrix_from_diags(diag_)
    n = matrix_a.shape[0]

    x_true = number_generator(size=n, uniform_range=(1.,1.))
    b = matrix_a.dot(x_true)

    return (matrix_a, x_true, b)

def _get_residual(matrix, x_true, b):
    x_solve = spsolve(matrix, b)
    residual = x_solve - x_true
    return residual

data_ = get_sample(number_generator, rank=10000000, off_diagonal_abs_mean=0.5, symmetric=False, uniform_range=(0.1,10.), log_=True)
print('Matrix')
try:
    print(data_[0].toarray())
except:
    print(f'Too big to print a {data_[0].shape} matrix.')
print('x_true')
print(data_[1])
print('b')
print(data_[2])

residual = _get_residual(data_[0], data_[1], data_[2])
print(f'Residual: {residual.mean():.4g}')
print(residual)
print(f'  Abs mean: {np.abs(residual).mean():.4g}')

Matrix
Too big to print a (10000000, 10000000) matrix.
x_true
[-1.14954861 -1.18926548 -1.22668983 ... -0.63844766 -0.34031333
 -0.37058542]
b
[-1.00798184 -2.7902782  -3.03387602 ... -0.52886687 -0.22879127
 -0.72503931]
Residual: 6.718e-17
[-2.22044605e-15 -1.33226763e-15  1.04360964e-14 ... -1.11022302e-16
  0.00000000e+00  5.55111512e-17]
  Abs mean: 9.906e-15
