#Estimation And Approximation Problems Over Groups (Spring 2020) - Final Project
## Low-Rank Multi-Refrence Factor Analysis Results Reconstruction- Elad Eatah

<a href="https://colab.research.google.com/github/RedCrow9564/SpectralMethodsProject-RandomSVD/blob/master/Spectral_Methods_Project_Random_SVD.ipynb"><img align="left" src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open in Colab" title="Open and Execute in Google Colaboratory"></a>
[![MIT License](https://img.shields.io/apm/l/atomic-design-ui.svg?)](https://github.com/tterb/atomic-design-ui/blob/master/LICENSEs)

Complete description of this project is available in [this repo](https://github.com/RedCrow9564/EstimationOverGroups-FinalProject.git).

## Getting Started
First run the nodes under "Infrastructure" one by one.
Second, run the first two nodes under "Main Components".

### Running Unit-Tests
You can then run the nodes under "UnitTests" one by one. The results of the last node are the results of these Unit-Tests.

### Performing an experiment
This is done by running the "main" node. Its results are then saved to a local directory in the remote machine. Make sure this directory exists on the remote machine.


#Infrastructure

In [22]:
#@title Dependencies installations
!pip install cvxpy cvxopt nptyping sacred pandas
from IPython.display import clear_output
import numpy as np
import warnings
import psutil
from multiprocessing import Pool
import pandas as pd
import os
import pyximport
import cython
import cvxpy as cp
from sacred import Experiment
from time import perf_counter
import cpuinfo
import warnings
#warnings.filterwarnings("error")  # Uncomment to see warnings as errors.
%load_ext Cython

# Defining the "sacred" experiment object.
ex = Experiment(name="Initializing project", interactive=True)
clear_output()
print("Installation is done!")

Installation is done!


In [23]:
#@title Common Utils
# -*- coding: utf-8 -*-
"""
utils.py - The common utilities functions and objects
=====================================================

This module contains all frequently-used methods and objects which can be shared among the entire project.
For example, data types name used for type-hinting, a basic enum class :class:`BaseEnum`, methods for measuring
run-time of a given function.
"""
from typing import List, Dict, Callable, Union, Iterator, Tuple, Any, TypeVar
from nptyping import NDArray
import inspect

# Naming data types for type hinting.
Scalar = Union[float, complex]
Vector = NDArray[(Any,), Any]
Matrix = NDArray[(Any, Any), Any]
ThreeDMatrix = NDArray[(Any, Any, Any), Any]


class _MetaEnum(type):
    """
    A private meta-class which given any :class:`BaseEnum` object to be an iterable.
    This can be used for iterating all possible values of this enum. Should not be used explicitly.
    """
    def __iter__(self) -> Iterator:
        """
        This method gives any BaseEnum the ability of iterating over all the enum's values.

        Returns:
            An iterator for the collection of all the enum's values.

        """
        # noinspection PyUnresolvedReferences
        return self.enum_iter()

    def __contains__(self, item) -> bool:
        """
        This method give any BaseEnum the ability to test if a given item is a possible value for this enum class.

        Returns:
            A flag which indicates if 'item' is a possible value for this enum class.

        """
        # noinspection PyUnresolvedReferences
        return self.enum_contains(item)


class BaseEnum(metaclass=_MetaEnum):
    """
    A basic interface for all enum classes. Should be sub-classed in eny enum, i.e ``class ExperimentType(BaseEnum)``
    """

    @classmethod
    def enum_iter(cls) -> Iterator:
        """
        This method gives any BaseEnum the ability of iterating over all the enum's values.

        Returns:
            An iterator for the collection of all the enum's values.

        """
        return iter(cls.get_all_values())

    @classmethod
    def enum_contains(cls, item) -> bool:
        """
        This method give any BaseEnum the ability to test if a given item is a possible value for this enum class.

        Returns:
            A flag which indicates if 'item' is a possible value for this enum class.

        """
        return item in cls.get_all_values()

    @classmethod
    def get_all_values(cls) -> List:
        """
        A method which fetches all possible values of an enum. Used for iterating over an enum.

        Returns:
            A list of all possible enum's values.

        """
        all_attributes: List = inspect.getmembers(cls, lambda a: not inspect.ismethod(a))
        all_attributes = [value for name, value in all_attributes if not (name.startswith('__') or name.endswith('__'))]
        return all_attributes


def create_factory(possibilities_dict: Dict[str, Callable], are_methods: bool = False) -> Callable:
    """
    A generic method for creating factories for the entire project.

    Args:
        possibilities_dict(Dict[str, Callable]): The dictionary which maps object types (as strings!) and returns the
            relevant class constructors.
        are_methods(bool): A flag, true if the factory output are methods, rather than objects. Defaults to False

    Returns:
         The factory function for the given classes/methods mapping.

    """
    def factory_func(requested_object_type: str):  # Inner function!
        if requested_object_type not in possibilities_dict:
            raise ValueError("Object type {0} is NOT supported".format(requested_object_type))
        else:
            if are_methods:
                return possibilities_dict[requested_object_type]
            else:
                return possibilities_dict[requested_object_type]()

    return factory_func


def measure_time(method: Callable) -> Callable:
    """
    A method which receives a method and returns the same method, while including run-time measure
    output for the given method, in seconds.

    Args:
        method(Callable): A method whose run-time we are interested in measuring.

    Returns:
        A function which does exactly the same, with an additional run-time output value in seconds.

    """
    def timed(*args, **kw):
        ts = perf_counter()
        result = method(*args, **kw)
        te = perf_counter()
        duration_in_ms: float = te - ts
        if isinstance(result, tuple):
            return result + (duration_in_ms,)
        else:
            return result, duration_in_ms
    timed.__name__ = method.__name__ + " with time measure"
    return timed


def is_empty(collection: List) -> bool:
    return len(collection) == 0


class DataLog:
    """
    A class for log-management objects. See the following example for creating it: ``DataLog(["Column 1", "Column 2"])``
    """
    def __init__(self, log_fields: List):
        """
        This methods initializes an empty log.

        Args:
            log_fields(List) - A list of column names for this log.

        """
        self._data: Dict = dict()
        self._log_fields: List = log_fields
        for log_field in log_fields:
            self._data[log_field] = list()

    def append(self, data_type: str, value: Scalar) -> None:
        """
        This methods appends given data to the given column inside the log.
        Example of usage:``log.append(DataFields.DataSize, 20)``

        Args:
            data_type(LogFields): The column name in which the input data in inserted to.
            value(Scalar): The value to insert to the log.

        """
        self._data[data_type].append(value)

    def append_dict(self, data_dict: Dict) -> None:
        """
        This methods takes the data from the input dictionary and inserts it to this log.

        Args:
            data_dict(Dict): The dictionary from which new data is taken and inserted to the log.

        """
        for log_field, data_value in data_dict.items():
            self.append(log_field, data_value)

    def save_log(self, log_file_name: str, results_folder_path: str) -> None:
        """
        This method saves the log to a file, with the input name, in the input folder path.

        Args:
            log_file_name(str): The name for this log file.
            results_folder_path(str): The path in which this log will be saved.

        """
        df = pd.DataFrame(self._data, columns=self._log_fields)
        df.to_csv(os.path.join(results_folder_path, log_file_name + ".csv"), sep=",", float_format="%.2E", index=False)
        ex.info["Experiment Log"] = self._data

print("Loaded Utils Successfully!")

Loaded Utils Successfully!


In [24]:
#@title Enums
# -*- coding: utf-8 -*-
"""
enums.py - All enums section
============================

This module contains all possible enums of this project. Most of them are used by the configuration section in
:mod:`main`. An example for using enum:
::
    DistributionType.Uniform

"""



class LogFields(BaseEnum):
    """
    The enum class of fields within experiments logs. Possible values:

    * ``LogFields.DataSize``

    * ``LogFields.DataType``

    * ``LogFields.ApproximationRank``

    * ``LogFields.ObservationsNumber``

    * ``LogFields.NoisePower``

    * ``LogFields.TrialsNum``

    * ``LogFields.ShiftsDistribution``

    * ``LogFields.MeanError``
    """
    DataSize: str = "Data size"
    DataType = "Data type (complex/real)"
    ApproximationRank: str = "r"
    ObservationsNumber: str = "Observations Number"
    NoisePower: str = "Noise power"
    TrialsNum: str = "Number of Trials"
    ShiftsDistribution: str = "Shifts Distribution"
    MeanError: str = "Mean Error"


class DistributionType(BaseEnum):
    """
    The enum class of experiment types. Possible values:

    * ``DistributionType.Uniform``

    * ``DistributionType.Dirac``

    """
    Uniform: str = "Uniform Distribution"
    Dirac: str = "Dirac Delta Distribution"


class DistributionParams(BaseEnum):
    """
    The enum class of parameters for the shifts distribution. Possible values:

    * ``DistributionParams.DeltaLocations``

    """
    DeltaLocations: str = "delta_locations"


print("Loaded Enums Successfully!")


Loaded Enums Successfully!


#Main Components

In [25]:
#@title Data Generation
from numpy.linalg import norm, qr


def create_discrete_distribution(distribution_type: str, distribution_length: int, distribution_params: Dict) -> Vector:
    if distribution_type == DistributionType.Uniform:
        return np.ones(distribution_length) / distribution_length
    elif distribution_type == DistributionType.Dirac:
        delta_locations: Vector = np.array(distribution_params[DistributionParams.DeltaLocations])
        distribution: Vector = np.zeros(distribution_length, dtype=np.float64)
        distribution[delta_locations] = 1
        distribution /= len(delta_locations)
        return distribution


def generate_covariance(signal_length: int, approximation_rank: int, data_type, random_generator) -> \
        (Matrix, Matrix, Vector):
    # Picking random eigenvalues (variances) for the exact covariance matrix.
    eigenvalues: Vector = random_generator.uniform(size=approximation_rank)
    eigenvalues /= norm(eigenvalues, ord=1)
    # Creating the set of orthonormal eigenvectors
    eigenvectors: Matrix = random_generator.uniform(size=(signal_length, approximation_rank))
    if data_type in [np.complex128, np.complex64, np.complex]:
        eigenvectors = eigenvectors.astype(data_type)
        eigenvectors += 1j * random_generator.uniform(size=(signal_length, approximation_rank))
    eigenvectors = qr(eigenvectors)[0]
    covariance: Matrix = (eigenvectors * eigenvalues).dot(np.conj(eigenvectors.T))
    return covariance, eigenvectors, eigenvalues


def generate_observations(eigenvectors: Matrix, eigenvalues: Vector, approximation_rank: int, observation_num: int,
                          data_type, random_generator) -> Matrix:

    # Sampling the signal from this covariance matrix.
    standard_deviations: Matrix = np.tile(np.sqrt(eigenvalues).reshape((-1, 1)), (1, observation_num))
    observations: Matrix = random_generator.normal(scale=standard_deviations, size=(approximation_rank, observation_num))
    if data_type in [np.complex128, np.complex64, np.complex]:
        standard_deviations /= np.sqrt(2)
        observations = np.sqrt(0.5) * observations.astype(data_type)
        observations += 1j * random_generator.normal(scale=standard_deviations, size=observations.shape)
    observations = np.dot(eigenvectors, observations).T
    return observations


def generate_shifts_and_noise(observations: Matrix, shifts: List[int], noise_power: float,
                              data_type, random_generator) -> Matrix:
    for i, shift in enumerate(shifts):
        observations[i] = np.roll(observations[i], shift)
    if data_type in [np.complex128, np.complex64, np.complex]:
        half_power_std: Scalar = np.sqrt(noise_power / 2)
        observations += random_generator.normal(loc=0, scale=half_power_std, size=observations.shape)
        observations += 1j * random_generator.normal(loc=0, scale=half_power_std, size=observations.shape)
    else:
        observations += random_generator.normal(loc=0, scale=np.sqrt(noise_power), size=observations.shape)
    return observations


print("Loaded Successfully!")


Loaded Successfully!


In [26]:
#@title Power-Spectrum and Tri-Spectrum Estimation
%%cython -f
# cython: language_level=3, boundscheck=False, wraparound=False
# cython: initializedcheck=False, cdivision=True, nonecheck=False
# distutils: language = c++
"""
polyspectra_estimation.pyx - spectrum estimations algorithms module
===================================================================
This module contains methods for estimating a signal's power spectrum and tri-spectrum
from a given matrix of its observations.
"""
import numpy as np
cimport numpy as np


cdef extern from "<complex>" namespace "std" nogil:
    double complex conj(double complex z)


def estimate_power_spectrum(const complex[:, ::1] observations_fourier):
    """
    The function for estimating a signal's power-spectrum from its observations.

    Args:
        observations_fourier(Matrix): The fourier coefficients of all observations.

    Returns:
        A vector :math:`P_{y}` which estimates the power-spectrum of the original signal.
    """
    return np.mean(np.power(np.abs(observations_fourier), 2), axis=0)


def estimate_tri_spectrum_naive(const complex[:, ::1] observations_fourier):
    """
    The function for estimating a signal's tri-spectrum from its observations.

    Args:
        observations_fourier(Matrix): The fourier coefficients of all observations.

    Returns:
        A 3D array :math:`T_{y}` which estimates the tri-spectrum of the original signal.
    """
    cdef Py_ssize_t signal_length = observations_fourier.shape[1]
    cdef Py_ssize_t observations_num = observations_fourier.shape[0]
    cdef np.ndarray[np.complex128_t, ndim=3] tri_spectrum = np.empty(
        (signal_length, signal_length, signal_length), dtype=np.complex128, order='F')
    tri_spectrum_estimation_v1(observations_fourier, signal_length, observations_num, tri_spectrum)
    return tri_spectrum


cdef inline void tri_spectrum_estimation_v1(const complex[:, ::1] observations_fourier,
                                            const Py_ssize_t signal_length, const Py_ssize_t observations_num,
                                            complex[::1, :, :] tri_spectrum):
    """
    A naive calculation of the tri-spectrum as a c-level function.

    Args:
        observations_fourier(Matrix): The fourier coefficients of all observations.
        signal_length(const Py_ssize_t): The second dimension (columns) length of the coefficients matrix.
        observations_num(const Py_ssize_t): The first dimension (rows) length of the coefficients matrix.
        tri_spectrum(ThreeDMatrix): An empty 3D array  of size signal_length^3 for storing the calculation's result.
    """
    cdef const double complex[:] observation
    cdef double complex temp = 0
    cdef Py_ssize_t i, j, k, m, s

    for i in range(signal_length):
        for j in range(signal_length):
            s = <Py_ssize_t>((i - j) % signal_length)
            if j > i: # This line verifies the modulo operator behaves the same as modulo in Python
                s += signal_length
            for k in range(signal_length):
                for m in range(observations_num):
                    observation = observations_fourier[m]
                    temp += observation[i] * conj(observation[j]) * observation[k] * conj(observation[s])
                tri_spectrum[i, j, k] = temp / <double complex>observations_num
                s = (s + 1) % signal_length
                temp = 0


def estimate_tri_spectrum_v2(const complex[:, ::1] observations_fourier):
    """
    The function for estimating a signal's tri-spectrum from its observations.

    Args:
        observations_fourier(Matrix): The fourier coefficients of all observations.

    Returns:
        A 3D array :math:`T_{y}` which estimates the tri-spectrum of the original signal.
    """
    cdef Py_ssize_t signal_length = observations_fourier.shape[1]
    cdef Py_ssize_t observations_num = observations_fourier.shape[0]
    cdef np.ndarray[np.complex128_t, ndim=3] trispectrum = np.empty(
        (signal_length, signal_length, signal_length), dtype=np.complex128, order='F')
    tri_spectrum_estimation_v2(observations_fourier, signal_length, observations_num, trispectrum)
    return trispectrum


cdef inline void tri_spectrum_estimation_v2(const complex[:, ::1] observations_fourier,
                                            const Py_ssize_t signal_length, const Py_ssize_t observations_num,
                                            complex[::1, :, :] tri_spectrum):
    """
    An improved calculation of the tri-spectrum as a c-level function, which applies a type of symmetry.

    Args:
        observations_fourier(Matrix): The fourier coefficients of all observations.
        signal_length(const Py_ssize_t): The second dimension (columns) length of the coefficients matrix.
        observations_num(const Py_ssize_t): The first dimension (rows) length of the coefficients matrix.
        tri_spectrum(ThreeDMatrix): An empty 3D array  of size signal_length^3 for storing the calculation's result.
    """
    #TODO: Improve efficiency of this method by applying MORE symmetries
    cdef const double complex[:] observation
    cdef double complex temp = 0
    cdef Py_ssize_t i, j, k, m, s

    for i in range(signal_length):
        for j in range(signal_length):
            for k in range(i):  # Applying a symmetry of the tri-spectrum: T[x,y,z]=T[z,y,x]
                tri_spectrum[i, j, k] = tri_spectrum[k, j, i]

            s = <Py_ssize_t>(2 * i - j) % signal_length
            if j > 2 * i:  # This line verifies the modulo operator behaves the same as modulo in Python
                s += signal_length
            for k in range(i, signal_length):
                for m in range(observations_num):
                    observation = observations_fourier[m]
                    temp += observation[i] * conj(observation[j]) * observation[k] * conj(observation[s])
                tri_spectrum[i, j, k] = temp / <double complex>observations_num
                s = (s + 1) % signal_length
                temp = 0

print("Loaded Successfully!")


Loaded Successfully!


In [27]:
#@title Vectorized Actions
%%cython -f
# cython: language_level=3, boundscheck=False, wraparound=False
# cython: initializedcheck=False, cdivision=True, nonecheck=False
# distutils: language = c++
"""
vectorized_actions.pyx - vectorized algorithms module
======================================================
This module contains methods for are implemented in Cython for run-time improvements
"""
import numpy as np
cimport numpy as np
from numpy import kron
from scipy.linalg import eigh
from libc.math cimport sqrt

# A type which represents either a double-precision real number (64 bit - np.float64) or a double-precision
# complex number (128 bit - np.complex128).
ctypedef fused double_or_complex:
    complex
    double

def extract_diagonals(double_or_complex[::1, :, :] matrices_arr, const int signal_length):
    """
    The function calculates the eigenvector of each matrices_arr[i] which corresponds to the largest (in magnitude)
    eigenvalue. This eigenvector is scaled by the square-root of the matching eigenvalue and stored as the i-th row
    of the result matrix.

    Args:
        matrices_arr(ThreeDMatrix): The matrices list, where square matrices are stored along the first dimension of
            the array, i.e matrices_arr[i] is the i-th square matrix in the array.
        signal_length(const int): The length of each dimension of all square matrices.

    Returns:
        A square matrix such that its i-th row is the scaled leading eigenvector of matrices_arr[i].
    """
    cdef Py_ssize_t i, j
    cdef double[:] eigenvalue
    cdef double first_scaled_eigenvalue
    cdef double_or_complex[:, :] eigenvector
    cdef np.ndarray[double_or_complex, ndim=2] diagonals = np.empty((signal_length - 1, signal_length),
                                                                    dtype=matrices_arr.base.dtype, order="F")

    for i in range(signal_length - 1):
        eigenvalue, eigenvector = eigh(
            matrices_arr[i], overwrite_a = True, overwrite_b = True, check_finite = False,
            eigvals=(signal_length - 1, signal_length - 1))
        first_scaled_eigenvalue = sqrt(eigenvalue[0])
        for j in range(signal_length):
            diagonals[i, j] = first_scaled_eigenvalue * eigenvector[j, 0]

    return diagonals


def vectorized_kron(double_or_complex[::1, :, :] matrices_arr):
    """
    The function for performing Kronecker product between each two consecutive matrices in a large 3D matrix

    Args:
        matrices_arr(ThreeDMatrix): The matrices list, where matrices are stored along the first dimension of
            the array, i.e matrices_arr[i] is the i-th matrix in the array.

    Returns:
        A 3D array which stores all these Kronecker products along its first axis.
    """
    cdef Py_ssize_t rows = matrices_arr.shape[1]
    cdef Py_ssize_t cols = matrices_arr.shape[2]
    cdef Py_ssize_t rows_sqr = rows * rows
    cdef Py_ssize_t cols_sqr = cols * cols
    cdef np.ndarray[double_or_complex, ndim=3] results = np.empty((rows, rows_sqr, cols_sqr),
                                                                  dtype=matrices_arr.base.dtype, order="F")
    cdef Py_ssize_t i = 0

    if double_or_complex is complex:
        for i in range(rows - 1):
            results[i] = kron(np.conj(matrices_arr[i + 1]), matrices_arr[i])
        results[rows - 1] = kron(np.conj(matrices_arr[0]), matrices_arr[rows - 1])
    else:
        for i in range(rows - 1):
            results[i] = kron(matrices_arr[i + 1], matrices_arr[i])
        results[rows - 1] = kron(matrices_arr[0], matrices_arr[rows - 1])
    return results


def construct_estimator(double_or_complex[::1, :] diagonals, const double[::1] power_spectrum,
                        const int signal_length, const float noise_power):
    """
    The function creates a signal_length times signal_length matrix such that the i-th row of the diagonals matrix
    is the i-th diagonal of the matrix (with wrapping) for all 1<i<signal_length. The mai diagonal of this matrix
    is set to the power-spectrum estimation, minus the estimated noise power.

    Args:
        diagonals(Matrix): A 2D square matrix of size signal_length times signal_length.
        power_spectrum(const double[::1]): A vector of size signal_length which estimates of the signal's
            power-spectrum.
        signal_length(const int): The length of each dimension of all square matrices.
        noise_power(const float): An estimation for the signal's noise power :math:`\sigma^{2}`.

    Returns:
        A 3D array which stores all these Kronecker products along its first axis.
    """
    cdef np.ndarray[double_or_complex, ndim=2] estimator = np.empty((signal_length, signal_length),
                                                                    dtype=diagonals.base.dtype)
    cdef Py_ssize_t i, j, column_index

    # Constructing the main diagonal of the estimator.
    for i in range(signal_length):
        estimator[i, i] = <double_or_complex>(power_spectrum[i] - noise_power)

    # Constructing the off-diagonal terms.
    for i in range(1, signal_length):
        for j in range(signal_length):
            column_index = <Py_ssize_t>((i + j) % signal_length)
            estimator[j, column_index] = diagonals[i - 1, j]

    return estimator

print("Loaded Successfully!")


Loaded Successfully!


In [28]:
#@title Covariance Estimation
from numpy.fft import ifft, fft
from scipy.linalg import eigh, block_diag, circulant


def low_rank_multi_reference_factor_analysis(
        observations_fourier: Matrix, signal_length: int, approximation_rank: Union[int, None],
        noise_power: float, data_type, exact_covariance) -> Matrix:
    """
    The entire algorithm fot the covariance estimation. It consists of two stages, see Algorithm 1 and Algorithm 2 in
    the paper.

    Args:
        observations_fourier(Matrix): The fourier coefficients of all observations.
        signal_length(int): The length of the signal.
        approximation_rank(int): The rank of the approximated covariance matrix.
        noise_power(float): An estimator for the noise power in the sampled observations.
        data_type: Either np.float64 for real-data or np.complex128 for complex data.
        exact_covariance(Matrix): The covariance which this algorithm estimated. It is used ONLY for
            testing, debugging and verifying the technical conditions which are required for this algorithm.

    Returns:
        The estimated covariance matrix.

    """
    # TODO: Remove the exact_covariance argument for real experiments.
    estimator: Matrix = estimate_covariance_up_to_phases(observations_fourier, signal_length, noise_power, data_type,
                                                         exact_covariance)
    estimator = phase_retrieval(estimator, signal_length, approximation_rank)
    return estimator


def estimate_covariance_up_to_phases(observations_fourier: Matrix, signal_length: int, noise_power: float,
                                     data_type, exact_covariance) -> Matrix:
    """
    The first stage algorithm fot the covariance estimation, see Algorithm 1 in the paper.

    Args:
        observations_fourier(Matrix): The fourier coefficients of all observations.
        signal_length(int): The length of the signal.
        noise_power(float): An estimator for the noise power in the sampled observations.
        data_type: Either np.float64 for real-data or np.complex128 for complex data.
        exact_covariance(Matrix): The covariance which this algorithm estimated. It is used ONLY for
            testing, debugging and verifying the technical conditions which are required for this algorithm.

    Returns:
        The estimated covariance matrix, up to multiplication by a circulant matrix of complex phases.

    """
    power_spectrum: Vector = estimate_power_spectrum(observations_fourier)
    tri_spectrum: ThreeDMatrix = estimate_tri_spectrum_v2(observations_fourier)

    # TODO: Remove the exact_covariance argument
    exact_cov_fourier_basis: Matrix = np.conj(fft(exact_covariance, axis=0, norm="ortho").T)
    exact_cov_fourier_basis: Matrix = np.conj(fft(exact_cov_fourier_basis, axis=0, norm="ortho").T)
    if np.any(np.abs(exact_covariance)) < 1e-15:
        warnings.warn("The covariance matrix in Fourier basis has some 0 entries, consistency is NOT guaranteed!",
                      Warning)
    print(f'Covariance Fourier Diagonal: {np.real(np.diag(exact_cov_fourier_basis))}')
    print(f'Power spectrum: {power_spectrum}')
    # exact_tri_spectrum: ThreeDMatrix = calc_exact_tri_spectrum(exact_cov_fourier_basis, data_type)
    # print(f'Max tri-spectrum estimation error: {np.max(np.abs(exact_tri_spectrum - tri_spectrum))}')

    # Optimization step
    g = perform_optimization(tri_spectrum, power_spectrum, signal_length, data_type)
    diagonals: Matrix = extract_diagonals(g, signal_length)
    estimated_covariance: Matrix = construct_estimator(diagonals, power_spectrum, signal_length, noise_power)
    return estimated_covariance


def create_optimization_objective(tri_spectrum, power_spectrum, data_type) -> Callable:
    g_zero = np.outer(power_spectrum, power_spectrum)
    signal_length = len(power_spectrum)

    def objective(matrices_list):
        fit_score = 0.0
        for k1 in range(signal_length):
            for k2 in range(signal_length):
                other_index = (k2 - k1) % signal_length
                for m in range(signal_length):
                    k1_plus_m = (k1 + m) % signal_length
                    current_term = tri_spectrum[k1, k1_plus_m, (k2 + m) % signal_length]
                    if other_index == 0:
                        current_term -= g_zero[k1, k1_plus_m]
                    else:
                        current_term -= matrices_list[other_index - 1][k1, k1_plus_m]

                    if m == 0:
                        current_term -= g_zero[k1, k2]
                    else:
                        current_term -= matrices_list[m - 1][k1, k2]

                    if data_type in [np.float32, np.float64]:
                        next_index: int = (k1 + k2 + m) % signal_length
                        if next_index == 0:
                            current_term -= g_zero[- k2 % signal_length, (-k2 - m) % signal_length]
                        else:
                            current_term -= matrices_list[next_index - 1][
                                - k2 % signal_length, (-k2 - m) % signal_length]

                    fit_score += cp.abs(current_term) ** 2
        return fit_score
    return objective


def perform_optimization(tri_spectrum: ThreeDMatrix, power_spectrum: Matrix, signal_length: int,
                         data_type) -> ThreeDMatrix:
    optimization_objective: Callable = create_optimization_objective(tri_spectrum, power_spectrum, data_type)
    symbolic_matrices = [cp.Variable((signal_length, signal_length), PSD=True, name=f'G{i + 1}')
                         for i in range(signal_length - 1)]
    problem = cp.Problem(cp.Minimize(optimization_objective(symbolic_matrices)), [])

    min_fit_score = problem.solve(solver=cp.SCS)
    optimization_result = [mat.value for mat in symbolic_matrices]

    print(f'Min score: {min_fit_score}')
    print(f'Are matrices None? {[(mat is None) for mat in optimization_result]}')
    print(f'Are matrices Hermitian? {[np.allclose(np.conj(mat), mat) for mat in optimization_result]}')
    print(f'Are matrices PSD? {[np.all(np.linalg.eigvalsh(mat) >= 0) for mat in optimization_result]}')

    return np.array(optimization_result, order='F')


def phase_retrieval(covariance_estimator: Matrix, signal_length: int, approximation_rank: Union[int, None]) -> Matrix:
    """
    The first stage algorithm fot the covariance estimation, see Algorithm 2 in the paper. This stage solves the
    phase ambiguity (up to the inherent ambiguity of the problem).

    Args:
        covariance_estimator(Matrix): The estimator which is given as the output of the first stage of the algorithm
            (Algorithm 1 in the paper).
        signal_length(int): The length of the signal.
        approximation_rank(int): The rank of the approximated covariance matrix.

    Returns:
        The estimated covariance matrix, up to the inherent ambiguity of the problem

    """

    # Building the coefficients matrix A.
    matrix_a: Matrix = coefficient_matrix_construction(covariance_estimator, signal_length, approximation_rank)
    # Finding the singular vector which corresponds to the smallest singular-value of A.
    b = np.dot(np.conj(matrix_a).T, matrix_a)
    val, v = eigh(b, overwrite_a=False, overwrite_b=False, check_finite=False, eigvals=(0, 2))
    if abs(val[1]) < 1e-15:
        warnings.warn("The dimension of the null-space of A is larger the one, so this estimator " +
                      "is NOT guaranteed to succeed!", Warning)

    # Finding the matching angles
    angles = estimate_angles(v, signal_length)
    phases: Vector = np.exp(-1j * angles)
    phases = np.insert(phases, 0, 1)

    # Multiplying by the negative phases to find the Fourier-basis covariance
    covariance_estimator = np.multiply(covariance_estimator, circulant(phases))

    # Converting the estimator back to the standard basis
    covariance_estimator = np.conj(ifft(covariance_estimator, axis=0, norm="ortho").T)
    covariance_estimator = np.conj(ifft(covariance_estimator, axis=0, norm="ortho").T)
    return covariance_estimator


def coefficient_matrix_construction(covariance_estimator: Matrix, signal_length: int,
                                    approximation_rank: Union[int, None]) -> Matrix:
    """
    This function constructs a linear equations system, for solving the phase ambiguity
    (up to the inherent ambiguity of the problem).

    Args:
        covariance_estimator(Matrix): The estimator which is given as the output of the first stage of the algorithm
            (Algorithm 1 in the paper).
        signal_length(int): The length of the signal.
        approximation_rank(int): The rank of the approximated covariance matrix.

    Returns:
        A signal_length ** 3 X (1 + approximation_rank ** 4) matrix (if approximation_rank < sqrt(signal_length))

    """
    rank_sqr: int = approximation_rank ** 2 if approximation_rank is not None else signal_length - 1
    least_eigenvalue_index: int = max(signal_length - rank_sqr, 0)
    last_index: int = signal_length ** 2
    transition_index: int = last_index - signal_length

    hi_i: Matrix = np.power(np.abs(covariance_estimator), 2)
    rotated_mat: Matrix = covariance_estimator
    all_eigenvectors: Matrix = np.empty((signal_length, signal_length, signal_length - least_eigenvalue_index),
                                        order="F", dtype=covariance_estimator.dtype)
    all_m_mats: Matrix = np.zeros((signal_length ** 3, signal_length), dtype=covariance_estimator.dtype)
    sampled_indices: Matrix = np.empty((signal_length, signal_length), dtype=int)
    sampled_indices[0] = np.arange(0, last_index, signal_length + 1)
    for m in range(1, signal_length):
        sampled_hi_indices = np.arange(m, transition_index, signal_length + 1)
        sampled_indices[m] = np.hstack((sampled_hi_indices, np.arange(transition_index,
                                                                      last_index, signal_length + 1)))
        transition_index -= signal_length

    for i in range(signal_length):
        all_eigenvectors[i] = eigh(hi_i, overwrite_b=False, check_finite=False,
                                   eigvals=(least_eigenvalue_index, signal_length - 1))[1]
        rotated_mat = np.roll(rotated_mat, shift=1, axis=1)
        hi_i = np.multiply(covariance_estimator, np.conj(rotated_mat))
        hi_i_plus_one_flat = hi_i.flatten('F')

        for m in range(signal_length):
            all_m_mats[sampled_indices[m] + i * last_index, m] = hi_i_plus_one_flat[sampled_indices[m]]

        rotated_mat = np.roll(rotated_mat, shift=1, axis=0)
        hi_i = np.multiply(covariance_estimator, np.conj(rotated_mat))

    matrix_a = np.hstack((all_m_mats, -block_diag(*vectorized_kron(all_eigenvectors))))
    return matrix_a


def estimate_angles(eigenvector: Vector, signal_length: int) -> Vector:
    """
    This function constructs a linear equations system, for solving the phase ambiguity
    (up to the inherent ambiguity of the problem).

    Args:
        eigenvector(Vector): The eigenvector of the equations matrix which corresponds to the singular value of 0.
        signal_length(int): The length of the signal.

    Returns:
        A vector of estimated phases.

    """
    v = eigenvector[:, 0][:signal_length]
    arguments_vector = np.angle(v)
    angles = np.cumsum(arguments_vector[1:])
    angles = -angles + (angles[signal_length - 2] + arguments_vector[0]) / signal_length * \
        np.arange(1, signal_length, 1)
    return angles


def calc_exact_tri_spectrum(exact_covariance: Matrix) -> ThreeDMatrix:
    signal_length: int = exact_covariance.shape[0]
    tri_spectrum = np.empty((signal_length, signal_length, signal_length), dtype=exact_covariance.dtype)

    from itertools import product

    for i, j, k in product(range(signal_length), range(signal_length), range(signal_length)):
        fourth_index = (k - j + i ) % signal_length
        tri_spectrum[i, j, k] = exact_covariance[i, j] * np.conj(exact_covariance[fourth_index, k])
        tri_spectrum[i, j, k] += exact_covariance[i, fourth_index] * np.conj(exact_covariance[j, k])

    return tri_spectrum

print("Loaded Successfully!")


Loaded Successfully!


In [35]:
#@title Main
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
main.py - The main module of the project
========================================

This module contains the config for the experiment in the "config" function.
Running this module invokes the :func:`main` function, which then performs the experiment and saves its results
to the configured results folder. Example for running an experiment: ``python main.py``

"""
import numpy as np
from numpy import roll
from numpy.linalg import norm
from numpy.random import Generator, PCG64
from numpy.fft import fft
from itertools import product


def calc_estimation_error(exact_covariance, estimated_covariance):
    covariance_norm: Scalar = norm(exact_covariance, ord='fro') ** 2
    rotated_cov: Matrix = exact_covariance
    error: Scalar = norm(estimated_covariance - rotated_cov, ord='fro') ** 2

    for _ in range(exact_covariance.shape[0]):
        rotated_cov = roll(rotated_cov, shift=[1, 1], axis=[0, 1])
        shifted_error = norm(estimated_covariance - rotated_cov, ord='fro') ** 2
        if shifted_error < error:
            error = shifted_error

    return error / covariance_norm


@ex.config
def config():
    """ Config section

    This function contains all possible configuration for all experiments. Full details on each configuration values
    can be found in :mod:`enums.py`.
    """

    data_type = np.complex128
    signal_lengths: [int] = [10]
    observations_numbers: List[int] = [1000000]
    approximation_ranks: List[Union[int, None]] = [2]
    noise_powers: List[float] = [0.0]
    trials_num: int = 1
    first_seed: int = 200
    shifts_distribution_type = DistributionType.Uniform
    distribution_params: Dict = {
        DistributionParams.DeltaLocations: [1]
    }
    experiment_name: str = "Testing Code Infrastructure"
    results_path: str = r'Results/'


@ex.main
def main(signal_lengths: List[int], observations_numbers: List[int], approximation_ranks: List[int],
         noise_powers: List[float], shifts_distribution_type: str, trials_num: int, data_type, results_path: str,
         experiment_name: str, first_seed: int, distribution_params: Dict) -> None:
    """ The main function of this project

    This functions performs the desired experiment according to the given configuration.
    The function runs the random_svd and random_id for every combination of data_size, approximation rank and increment
    given in the config and saves all the results to a csv file in the results folder (given in the configuration).
    """
    results_log = DataLog(LogFields)  # Initializing an empty results log.

    for signal_length, noise_power, approximation_rank, observations_num in product(
            signal_lengths, noise_powers, approximation_ranks, observations_numbers):
        if approximation_rank >= np.sqrt(signal_length):
            warnings.warn(f'Approximation rank {approximation_rank} is at least the square-root of the ' +
                          f'signal length {signal_length}, consistency is NOT guaranteed!', Warning)
        shifts_distribution: Vector = create_discrete_distribution(shifts_distribution_type, signal_length,
                                                                   distribution_params)

        mean_error: float = 0
        trials_seeds: Vector = np.arange(first_seed, first_seed + trials_num).tolist()
        
        for trial_seed in trials_seeds:
            rng = Generator(PCG64(trial_seed))  # Set trial's random generator.
            exact_covariance, eigenvectors, eigenvalues = generate_covariance(
                signal_length, approximation_rank, data_type, rng)
            observations = generate_observations(eigenvectors, eigenvalues, 
                                                 approximation_rank, observations_num,
                                                 data_type, rng)
            observations_shifts: List[int] = rng.choice(signal_length, size=observations_num, p=shifts_distribution)
            observations = generate_shifts_and_noise(observations, observations_shifts, noise_power, data_type, rng)
            observations_fourier: Matrix = fft(observations, norm="ortho", axis=1)
            # TODO: Remove the exact_covariance argument for real experiments.
            estimated_covariance: Matrix = low_rank_multi_reference_factor_analysis(
                observations_fourier, signal_length, approximation_rank, noise_power, data_type, exact_covariance)
            mean_error += calc_estimation_error(exact_covariance, estimated_covariance)

        mean_error /= trials_num
        print(f'Finished experiment of signal length L={signal_length}, n={observations_num}, '
              f'r={approximation_rank}, noise={noise_power} with error {mean_error}')

        # Appending all the experiment results to the log.
        results_log.append(LogFields.DataSize, signal_length)
        results_log.append(LogFields.DataType, data_type.__name__)
        results_log.append(LogFields.ApproximationRank, approximation_rank)
        results_log.append(LogFields.NoisePower, noise_power)
        results_log.append(LogFields.ObservationsNumber, observations_num)
        results_log.append(LogFields.TrialsNum, trials_num)
        results_log.append(LogFields.ShiftsDistribution, shifts_distribution_type)
        results_log.append(LogFields.MeanError, mean_error)

    results_log.save_log(f'{experiment_name} results', results_folder_path=results_path)
    return results_log

pd.DataFrame(ex.run().result._data)


INFO - Initializing project - Running command 'main'
INFO - Initializing project - Started


Covariance Fourier Diagonal: [0.44254662 0.02560909 0.09368245 0.01462619 0.06994176 0.16883959
 0.0441985  0.09270842 0.01692912 0.03091827]
Power spectrum: [0.44281827 0.02559436 0.093596   0.01463897 0.06988274 0.16880402
 0.04415838 0.09264484 0.01692436 0.03089064]


INFO - Initializing project - Result: <__main__.DataLog object at 0x7f24bb40a7f0>
INFO - Initializing project - Completed after 0:00:51


Min score: 0.01011640166361909
Are matrices None? [False, False, False, False, False, False, False, False, False]
Are matrices Hermitian? [True, True, True, True, True, True, True, True, True]
Are matrices PSD? [False, False, False, False, False, False, False, False, False]
Finished experiment of signal length L=10, n=1000000, r=2, noise=0.0 with error 0.4439374177848892


Unnamed: 0,r,Data size,Data type (complex/real),Mean Error,Noise power,Observations Number,Shifts Distribution,Number of Trials
0,2,10,complex128,0.443937,0.0,1000000,Uniform Distribution,1


#Unit-Tests

In [30]:
#@title Test Diagonal Extraction and Construction
# -*- coding: utf-8 -*-
"""
test_diagonal_extraction_and_construction.py - tests for data creation methods
===============================================================================

This module contains the tests for the diagonals' extraction method and the estimator construction method
for the first part of the algorithm.

"""
import unittest
import numpy as np


class TestDiagonalExtractionAndConstruction(unittest.TestCase):
    """
    A class which contains tests for the diagonals extraction method, and the method for constructing
    a matrix, given these diagonals
    """
    def test_diagonals_extraction(self):
        """
        Test diagonals extraction

        This test validates the extract_diagonals take the maximal eigenvector from each matrix in its first dimension
        and scales it w.r.t square-root of the maximal eigenvalue.

        """
        mat_num: int = 4
        max_eigenvalue: float = 9.0
        sqrt_max_eigenvalue: float = np.sqrt(max_eigenvalue)
        mat: Matrix = np.asfortranarray(np.tile(np.diag([3.0, max_eigenvalue, -4.0, -2.0]), (mat_num - 1, 1, 1)))
        diagonals: Matrix = extract_diagonals(mat, mat_num)
        expected_output: Matrix = np.tile(np.array([0, sqrt_max_eigenvalue, 0, 0]), (3, 1))
        self.assertTrue(np.allclose(diagonals, expected_output))

    def test_estimator_construction(self):
        """
        Test methods equivalence.

        This test validates that both the naive method and the improved method return identical
        output (up to numerical inaccuracies) for some random signal.

        """
        mat_num: int = 4
        max_eigenvalue: float = 9.0
        sqrt_max_eigenvalue: float = np.sqrt(max_eigenvalue)
        mat: Matrix = np.asfortranarray(np.tile(np.diag([3.0, max_eigenvalue, -4.0, -2.0]), (mat_num - 1, 1, 1)))
        diagonals: Matrix = extract_diagonals(mat, mat_num)
        power_spectrum: Vector = np.array(4 * [1.0])
        estimator: Matrix = construct_estimator(diagonals, power_spectrum, 4, 0)
        expected_output: Matrix = np.eye(4)
        expected_output[1] = np.array([sqrt_max_eigenvalue, 1, sqrt_max_eigenvalue, sqrt_max_eigenvalue])
        self.assertTrue(np.allclose(estimator, expected_output))

print("Loaded Unit-Tests for diagonal extraction and construction successfully!")


Loaded Unit-Tests for diagonal extraction and construction successfully!


In [31]:
#@title Test Phase-Retrieval Actions
# -*- coding: utf-8 -*-
"""
test_phase_retrieval_actions.py - tests for the stages in the phase retrieval algorithm
=======================================================================================

This module contains the tests for the different stages of the phase-retrieval algorithm:
constructing the coefficients matrix and the Fourier basis transition of the fixed covariance estimator.

"""
import unittest
import numpy as np
from numpy.random import Generator, PCG64
from numpy.fft import ifft, fft
from scipy.linalg import dft, eigh, block_diag, circulant


class TestPhaseRetrievalActions(unittest.TestCase):
    """
    A class which tests components of the phase-retrieval algorithm
    """
    def test_fourier_matrices_product(self):
        """
        Test Fourier basis transition

        This test validates that the transition of a matrix from Fourier basis to the standard basis is equivalent to
        multiplying my the DFT matrix from right and its inverse from the left.

        """
        rng = Generator(PCG64(1995))
        n: int = rng.integers(low=2, high=100)
        mat: Matrix = rng.standard_normal((n, n))
        tested_output = np.conj(ifft(mat, axis=0, norm="ortho").T)
        tested_output = np.conj(ifft(tested_output, axis=0, norm="ortho").T)
        dft_mat: Matrix = dft(n, scale="sqrtn")
        expected_output: Matrix = np.conj(dft_mat.T).dot(mat).dot(dft_mat)
        self.assertTrue(np.allclose(tested_output, expected_output))

        mat: Matrix = rng.standard_normal((n, n))
        mat_fourier: Matrix = np.conj(fft(mat, axis=0, norm="ortho").T)
        mat_fourier: Matrix = np.conj(fft(mat_fourier, axis=0, norm="ortho").T)
        expected_output: Matrix = dft_mat.dot(mat).dot(np.conj(dft_mat.T))
        self.assertTrue(np.allclose(mat_fourier, expected_output))

    def test_coefficient_matrix_construction(self):
        """
        Test coefficient matrix properties

        This test validates that the coefficients matrix in the phase retrieval algorithm follows the
        theoretical properties.

        """
        rng = Generator(PCG64(1995))
        n: int = rng.integers(low=9, high=20)
        r: int = rng.integers(low=2, high=np.floor(np.sqrt(n)))
        mat: Matrix = rng.standard_normal((n, n))
        mat = np.dot(mat, mat.T)
        coeff_mat: Matrix = coefficient_matrix_construction(mat, signal_length=n, approximation_rank=r)
        self.assertEqual(coeff_mat.shape[0], n ** 3)
        self.assertEqual(coeff_mat.shape[1], (1 + r ** 4) * n)

        other_mat: Matrix = matrix_construction_alternative(mat, signal_length=n, approximation_rank=r)
        self.assertTrue(np.allclose(coeff_mat, other_mat), msg=f'{np.max(np.abs(other_mat - coeff_mat))}')


def matrix_construction_alternative(covariance_estimator: Matrix, signal_length: int,
                                    approximation_rank: Union[int, None]) -> Matrix:
    hi_i: Matrix = np.power(np.abs(covariance_estimator), 2)
    rotated_mat: Matrix = covariance_estimator.copy()
    rank_sqr: int = approximation_rank ** 2 if approximation_rank is not None else signal_length - 1
    least_eigenvalue_index: int = signal_length - rank_sqr
    last_index: int = signal_length ** 2
    all_eigenvectors: Matrix = np.empty((signal_length, signal_length, rank_sqr), order="F",
                                        dtype=covariance_estimator.dtype)
    all_m_mats: Matrix = np.zeros((signal_length ** 3, signal_length), dtype=covariance_estimator.dtype)

    for i in range(signal_length):
        all_eigenvectors[i] = eigh(hi_i, overwrite_b=False, check_finite=False,
                                   eigvals=(least_eigenvalue_index, signal_length - 1))[1]
        rotated_mat = np.roll(rotated_mat, shift=1, axis=1)
        hi_i = np.multiply(covariance_estimator, np.conj(rotated_mat))
        hi_i_plus_one_flat = hi_i.flatten('F')

        for m in range(signal_length):
            em = np.zeros(signal_length)
            em[m] = 1
            circ = circulant(em).flatten('F')
            all_m_mats[i * last_index : (i + 1) * last_index, m] = np.multiply(hi_i_plus_one_flat, circ)
        rotated_mat = np.roll(rotated_mat, shift=1, axis=0)
        hi_i = np.multiply(covariance_estimator, np.conj(rotated_mat))

    matrix_a = np.hstack((all_m_mats, -block_diag(*vectorized_kron(all_eigenvectors))))
    return matrix_a


print("Loaded Unit-Tests for Phase-Retrieval actions successfully!")


Loaded Unit-Tests for Phase-Retrieval actions successfully!


In [32]:
#@title Test Power-Spectrum and Tri-Spectrum Estimation Methods 
# -*- coding: utf-8 -*-
"""
test_tri_spectrum_estimation.py - tests for spectra estimation methods
======================================================================

This module contains the tests for the power-spectrum and tri-spectrum estimation methods.
"""
import unittest
import numpy as np
from numpy.random import Generator, PCG64

def calc_exact_tri_spectrum(exact_covariance: Matrix, data_type) -> ThreeDMatrix:
    signal_length: int = exact_covariance.shape[0]
    tri_spectrum = np.empty((signal_length, signal_length, signal_length), dtype=exact_covariance.dtype)

    for i, j, k in product(range(signal_length), range(signal_length), range(signal_length)):
        fourth_index = (k - j + i) % signal_length
        tri_spectrum[i, j, k] = exact_covariance[i, j] * np.conj(exact_covariance[fourth_index, k])
        tri_spectrum[i, j, k] += exact_covariance[i, fourth_index] * np.conj(exact_covariance[j, k])

        if data_type in [np.float32, np.float64]:
            tri_spectrum[i, j, k] += exact_covariance[i, (-k % signal_length)] * np.conj(
                exact_covariance[j, (-fourth_index % signal_length)])

    return tri_spectrum


class TestTriSpectrumAlgorithms(unittest.TestCase):
    """
    A class which contains tests for the tri-spectrum estimation algorithms.
    """
    def test_equivalence_to_naive_method(self):
        """
        Test methods equivalence.

        This test validates that both the naive method and the improved method return identical
        output (up to numerical inaccuracies) for some random signal.

        """
        rng = Generator(PCG64(596))
        signal_length: int = rng.integers(low=10, high=40)
        observations_num: int = rng.integers(low=5, high=20)
        approximation_rank: int = rng.integers(low=5, high=20)
        data_type = np.complex128
        covariance, eigenvectors, eigenvalues = generate_covariance(signal_length, approximation_rank, data_type, rng)
        observations = generate_observations(eigenvectors, eigenvalues, approximation_rank, observations_num,
                                             data_type, rng)
        observations_fourier: Matrix = np.fft.fft(observations, axis=1, norm="ortho")
        tri_spectrum_naive: ThreeDMatrix = estimate_tri_spectrum_naive(observations_fourier)
        tri_spectrum_improved: ThreeDMatrix = estimate_tri_spectrum_v2(observations_fourier)
        # Validate both tri-spectra are equal
        self.assertTrue(np.allclose(np.abs(tri_spectrum_naive), np.abs(tri_spectrum_improved)),
                        msg=f'{np.max(np.abs(tri_spectrum_improved - tri_spectrum_naive))}')
        self.assertTrue(np.allclose(np.angle(tri_spectrum_naive), np.angle(tri_spectrum_improved)),
                        msg=f'{np.max(np.angle(tri_spectrum_improved) - np.angle(tri_spectrum_naive))}')

    def test_power_spectrum_and_tri_spectrum_consistency(self):
        """
        Test the consistency of the power-spectrum estimation.

        This test validates that the estimation over a large number of observations (without noise)
        is "very close" to the exact power-spectrum

        """
        rng = Generator(PCG64(596))
        signal_length: int = rng.integers(low=10, high=40)
        observations_num: int = rng.integers(low=10000, high=50000)
        approximation_rank: int = rng.integers(low=2, high=signal_length)
        data_type = np.complex128
        tol = 1e-3
        exact_covariance, eigenvectors, eigenvalues = generate_covariance(signal_length, approximation_rank, data_type,
                                                                          rng)
        observations = generate_observations(eigenvectors, eigenvalues, approximation_rank, observations_num,
                                             data_type, rng)
        observations_fourier: Matrix = np.fft.fft(observations, norm="ortho")
        exact_cov_fourier_basis: Matrix = np.conj(np.fft.fft(exact_covariance, axis=0, norm="ortho").T)
        exact_cov_fourier_basis: Matrix = np.conj(np.fft.fft(exact_cov_fourier_basis, axis=0, norm="ortho").T)
        exact_diagonal: Vector = np.real(np.diag(exact_cov_fourier_basis))
        power_spectrum_estimation: Vector = estimate_power_spectrum(observations_fourier)
        exact_tri_spectrum: ThreeDMatrix = calc_exact_tri_spectrum(exact_cov_fourier_basis, data_type)
        estimate_tri_spectrum: ThreeDMatrix = estimate_tri_spectrum_v2(observations_fourier)

        # Validate both tri-spectrum and power-spectrum estimations are consistent.
        self.assertTrue(np.allclose(exact_diagonal, power_spectrum_estimation, atol=tol, rtol=0),
                        msg=f'Power-spectrum estimation is inconsistent!')
        self.assertTrue(np.allclose(estimate_tri_spectrum, exact_tri_spectrum, atol=tol, rtol=0),
                        msg=f'Tri-spectrum estimation is inconsistent!, error=' +
                            f'{np.max(np.abs(estimate_tri_spectrum - exact_tri_spectrum))}')

    def test_real_data_tri_spectrum_consistency(self):
        """
        Test the consistency of the tri-spectrum estimation in the case of REAL data (since the exact
        tri-spectrum becomes more complicated in this case).

        This test validates that the estimation over a large number of observations (without noise)
        is "very close" to the exact tri-spectrum

        """
        rng = Generator(PCG64(596))
        signal_length: int = rng.integers(low=10, high=40)
        observations_num: int = rng.integers(low=10000, high=50000)
        approximation_rank: int = rng.integers(low=2, high=signal_length)
        data_type = np.float64
        tol = 1e-3
        exact_covariance, eigenvectors, eigenvalues = generate_covariance(signal_length, approximation_rank, data_type,
                                                                          rng)
        observations = generate_observations(eigenvectors, eigenvalues, approximation_rank, observations_num,
                                             data_type, rng)
        observations_fourier: Matrix = np.fft.fft(observations, norm="ortho")
        exact_cov_fourier_basis: Matrix = np.conj(np.fft.fft(exact_covariance, axis=0, norm="ortho").T)
        exact_cov_fourier_basis: Matrix = np.conj(np.fft.fft(exact_cov_fourier_basis, axis=0, norm="ortho").T)
        exact_tri_spectrum: ThreeDMatrix = calc_exact_tri_spectrum(exact_cov_fourier_basis, data_type)
        estimate_tri_spectrum: ThreeDMatrix = estimate_tri_spectrum_v2(observations_fourier)

        # Validate the tri-spectrum estimation in the real case is consistent.
        self.assertTrue(np.allclose(estimate_tri_spectrum, exact_tri_spectrum, atol=tol, rtol=0),
                        msg=f'Tri-spectrum estimation is inconsistent!, error=' +
                            f'{np.max(np.abs(estimate_tri_spectrum - exact_tri_spectrum))}')

print("Loaded Unit-Tests for Power-Spectrum and Tri-Spectrum Estimation successfully!")


Loaded Unit-Tests for Power-Spectrum and Tri-Spectrum Estimation successfully!


In [33]:
#@title Test Optimization Step Components 
# -*- coding: utf-8 -*-
"""
test_optimization_step.py - tests for the optimization step components
======================================================================

This module contains the tests for the optimization step in Algorithm 1
in the paper.
"""
import unittest
import numpy as np
from numpy.random import Generator, PCG64


def _create_optimization_objective(tri_spectrum, power_spectrum, data_type) -> Callable:
    g_zero = np.outer(power_spectrum, power_spectrum)
    signal_length = len(power_spectrum)

    def objective(matrices_list):
        fit_score = 0.0
        current_matrices_list = np.concatenate(([g_zero], matrices_list), axis=0)
        for k1 in range(signal_length):
            for k2 in range(signal_length):
                other_index = (k2 - k1) % signal_length
                k1_plus_m = k1
                k2_plus_m = k2
                for m in range(signal_length):
                    current_term = tri_spectrum[k1, k1_plus_m, k2_plus_m]
                    current_term -= current_matrices_list[other_index][k1, k1_plus_m]
                    current_term -= current_matrices_list[m][k1, k2]

                    if data_type in [np.float32, np.float64]:
                        next_index: int = (k1 + k2 + m) % signal_length
                        current_term -= current_matrices_list[next_index][(-k2) % signal_length,
                                                                          (-k2 - m) % signal_length]

                    k1_plus_m = (k1_plus_m + 1) % signal_length
                    k2_plus_m = (k2_plus_m + 1) % signal_length
                    fit_score += abs(current_term) ** 2
        return fit_score
    return objective


def _find_diagonals(exact_covariance: Matrix) -> Matrix:
    """
    The function extracts the diagonals of the exact covariance matrix (in Fourier basis).
    Args:
        exact_covariance(Matrix): The exact covariance (square) matrix in Fourier basis.

    Returns:
        A square matrix such that its i-th row is the i-th diagonal of the input matrix.
    """
    signal_length: int = exact_covariance.shape[0]
    diags: Matrix = np.empty((signal_length - 1, signal_length), dtype=exact_covariance.dtype)

    for i in range(1, signal_length):
        diags[i - 1] = np.hstack((np.diagonal(exact_covariance, offset=i),
                                  np.diagonal(exact_covariance, offset=-signal_length + i)))
    return diags


def _test_optimization_template(data_type, signal_length, approximation_rank, seed):
    rng = Generator(PCG64(seed))
    exact_covariance, eigenvectors, eigenvalues = generate_covariance(signal_length, approximation_rank, data_type, rng)
    exact_cov_fourier_basis: Matrix = np.conj(np.fft.fft(exact_covariance, axis=0, norm="ortho").T)
    exact_cov_fourier_basis: Matrix = np.conj(np.fft.fft(exact_cov_fourier_basis, axis=0, norm="ortho").T)
    exact_power_spectrum: Vector = np.real(np.diag(exact_cov_fourier_basis))
    exact_tri_spectrum: ThreeDMatrix = calc_exact_tri_spectrum(exact_cov_fourier_basis, data_type)
    diagonals = _find_diagonals(exact_cov_fourier_basis)
    g_mats = np.array([diagonal.reshape(-1, 1).dot(np.conj(diagonal.reshape(-1, 1).T)) for diagonal in diagonals])
    optimization_object: Callable = _create_optimization_objective(exact_tri_spectrum, exact_power_spectrum, data_type)
    return optimization_object(g_mats), exact_tri_spectrum, exact_power_spectrum


class TestOptimization(unittest.TestCase):
    """
    A class which contains tests for the optimization step of Algorithm 1.
    """
    def test_optimization_complex_case(self):
        """
        Test the optimization objective function for the complex case.

        This test creates the optimal input matrices for the objective function
        and then verifies the value of the objective function, given these matrices as input,
        is very close to zero. In addition, this function verifies the theoretical equation between
        the main "diagonal" of the tri-spectrum and two times the square of the power-spectrum squared.
        """
        seed: int = 1995
        data_type = np.complex128
        signal_length: int = 3
        approximation_rank: int = signal_length
        min_value, exact_tri_spectrum, exact_power_spectrum = _test_optimization_template(
            data_type, signal_length, approximation_rank, seed)
        tri_diagonal: Vector = np.array([np.real(exact_tri_spectrum[i, i, i]) for i in range(signal_length)],
                                        dtype=np.float64)
        min_expected_value = np.abs(tri_diagonal - 2 * np.power(exact_power_spectrum, 2))
        min_expected_value = np.sum(min_expected_value)
        print(f'Minimal objective function value in the COMPLEX case: {min_value}')
        self.assertTrue(np.allclose(min_value, 0, atol=1e-20, rtol=0), msg=f'Minimal value is NOT optimal={min_value}')
        self.assertEqual(min_expected_value, 0,
                         msg=f'Tri-spectrum and power-spectrum estimation error={min_expected_value}')

    def test_optimization_real_case(self):
        """
        Test the optimization objective function for the real case.

        This test creates the optimal input matrices for the objective function
        and then verifies the value of the objective function, given these matrices as input,
        is very close to zero. In addition, this function verifies the theoretical equation between
        the first term of the tri-spectrum and three times the square of the first entry of the power-spectrum squared.
        """
        seed: int = 1995
        data_type = np.float64
        signal_length: int = 3
        approximation_rank: int = signal_length
        min_value, exact_tri_spectrum, exact_power_spectrum = _test_optimization_template(data_type, signal_length,
                                                                                          approximation_rank, seed)
        tri_diagonal: Vector = np.array([np.real(exact_tri_spectrum[i, i, i]) for i in range(signal_length)],
                                        dtype=np.float64)
        min_expected_value = np.abs(tri_diagonal[0] - 3 * np.power(exact_power_spectrum[0], 2))
        print(f'Minimal objective function value in the REAL case: {min_value}')
        self.assertTrue(np.allclose(min_value, 0, atol=1e-20, rtol=0), msg=f'Minimal value is NOT optimal={min_value}')
        self.assertEqual(min_expected_value, 0, msg=f'Tri-spectrum and power-spectrum estimation error={min_value}')

print("Loaded Unit-Tests for Optimization step successfully!")


Loaded Unit-Tests for Optimization step successfully!


In [34]:
#@title Running all these Unit-Tests
unittest.main(argv=[''], verbosity=2, exit=False)


test_diagonals_extraction (__main__.TestDiagonalExtractionAndConstruction) ... ok
test_estimator_construction (__main__.TestDiagonalExtractionAndConstruction) ... ok
test_optimization_complex_case (__main__.TestOptimization) ... ok
test_optimization_real_case (__main__.TestOptimization) ... ok
test_coefficient_matrix_construction (__main__.TestPhaseRetrievalActions) ... 

Minimal objective function value in the COMPLEX case: 3.746239375889483e-34
Minimal objective function value in the REAL case: 5.900805778551215e-33


ok
test_fourier_matrices_product (__main__.TestPhaseRetrievalActions) ... ok
test_equivalence_to_naive_method (__main__.TestTriSpectrumAlgorithms) ... ok
test_power_spectrum_and_tri_spectrum_consistency (__main__.TestTriSpectrumAlgorithms) ... ok
test_real_data_tri_spectrum_consistency (__main__.TestTriSpectrumAlgorithms) ... ok

----------------------------------------------------------------------
Ran 9 tests in 78.788s

OK


<unittest.main.TestProgram at 0x7f24bba0fdd8>