Generating realizations of a 2d Gaussian random field $\phi(x)$, with some simple analysis run on the position space results.  Inspired by the [Cosmic Microwave Background (CMB)](https://en.wikipedia.org/wiki/Cosmic_microwave_background); see [my website](https://garrettgoon.com/gaussian-fields/) for more background information.

TODO:

- Extend to higher dimensions
- Some systematic errors seem to still exist w/ theory doing increasingly poorly and over-predicting correlations as `power` increases towards 2.  Need to find/understand, test more

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import emcee
import scipy.stats as stats
import corner
import time

import scipy.stats as stats
from scipy.optimize import minimize
from scipy.special import gamma

Create a class for generating the power spectrum.  Spectra are characterized by an amplitude and power-law exponent: $\langle \phi(k)\phi(-k)\rangle'=\frac{\texttt{amplitude}}{k^{\texttt{power}}}\equiv P(k)$, corresponding in position space to $\langle \phi(x)\phi(0)\rangle\propto \frac{\texttt{amplitude}}{x^{d-{\texttt{power}}}}\equiv G(x)$ in $d$-dimensions. 

Important parameters:

- `amplitude`: amplitude of spectrum, defined above
- `power`: power-law exponent, defined above
- `dimensions`: number of spatial dimensions (can only use `dimensions`=2, currently)
- `size_exponent`: grid size is 2**`size_exponent`
- `max_len`: maximum separation of data points collected when forming binned correlation function
- `min_len`: minimum separation of data points collected when forming binned correlation function
    
Methods: 

- `generate`: generate realization of desired power spectrum, perform simple fit as sanity check
- `spectrum_plot`: visualization of generated spectra
- `hist`: histogram of generated values (should be normally distributed; mostly a sanity check)
- `bin_pair_data`: pair data separated by less than distance `max_len` and bin to find correlation function. Produce some summary plots, perform simple linear-regression, and compare fit and theory to results.
- `bayes`: simple bayesian/MCMC analysis on `amplitude` and `power` parameters using flat priors
    


In [2]:
class CmbPy():
    """
    A class for generating 2D power specta and some analysis of the results.
    """
    def __init__(self, amplitude=1, power=1.75, size_exponent=5, dimensions=2):
        self.amplitude = amplitude
        self.power = power
        self._size = int(2 ** size_exponent)
        self.dimensions = dimensions
        
        """
        Writing P(k)=amplitude/k^{power}, then corresponding position-space correlator is
        G(x)=_x_amplitude/x^{dims-power} with _x_amplitude as below.
        """
        self._x_amplitude_numerator = self.amplitude * gamma(1 - (self.power / 2))
        self._x_amplitude_denominator = (np.pi * (2 ** self.power)) * gamma(self.power / 2)
        self._x_amplitude = self._x_amplitude_numerator / self._x_amplitude_denominator
        

    def _theory_xspace(self, params, x):
        amplitude, power = params
        """
        The theoretical position-space correlator, given the model parameters. 
        Only valid for 2d right now. Fourier transform to position space is
        unambiguous for 2 > power > .5. Used for fitting. Might be missing some
        (2 * np.pi / self._size)  factors?

        """
        numerator = amplitude * gamma(1 - (power / 2))
        denominator = (np.pi * (2 ** power)) * gamma(power / 2)
        return numerator * (x ** (power - 2)) / denominator

    
    def generate(self):
        """
        Generating the spectrum.
        """
        # Creating a grid w/ each point pulled from std normal.
        gaussian_seed = np.random.normal(
            size=[self._size for _ in range(self.dimensions)])
        gaussian_seed_fourier = np.fft.fft2(gaussian_seed)
        # Numpy's fft algorithm automatically indexes with negative values on right half
        # positive on left half, as desired.

        # Generating the fft momenta indices and their norms.
        kvector = np.fft.fftfreq(self._size) * self._size
        kgrid = np.meshgrid(kvector, kvector)
        # include 2 pi / N factors
        knorms = np.sqrt(kgrid[0] ** 2 + kgrid[1] ** 2)

        # Create the desired power spectrum with the k=0 divergence regulated to zero.
        if self.power > 0:
            knorms[0][0] = np.inf
        # 2 pi / N factors included here
        power_spectrum = self.amplitude * ((knorms * (2 * np.pi / self._size)) ** (-1 * self.power))
        power_spectrum_sqrt = np.sqrt(power_spectrum)
        # Multiply by the transformed white noise to get the realization of the spectrum.
        fourier_realization = gaussian_seed_fourier * power_spectrum_sqrt

        # Create the power spectrum.
        # https://bertvandenbroucke.netlify.app/2019/05/24/computing-a-power-spectrum-in-python/ is useful resource for this
        # We need 1/N factors to match conventions for continuous case.
        fourier_amplitudes = (np.abs(fourier_realization) ** 2) / ((self._size) ** self.dimensions)
        
        # Flatten out and bin.
        fourier_amplitudes_flat = fourier_amplitudes.flatten()
        knorms_flat = knorms.flatten()
        k_bins = np.arange(.5, self._size // 2 + 1, 1) 
        k_vals = 0.5 * (k_bins[1:] + k_bins[:-1])
        binned_means, _, _ = stats.binned_statistic(knorms_flat,
                                                    fourier_amplitudes_flat,
                                                    statistic='mean',
                                                    bins=k_bins)
        
        
        """
        In order to perform a power-law linear regression fit on the binned values,
        we need to take a logarithm which requires all binned values to be positive,
        while some are negative at large k, presumably due to low statistics.
        As a bit of a fudge, we isolate the terms in binned_means which are positive.
        """
        
        binned_means_first_neg_index = 0
        for index, num in enumerate(binned_means):
            if num < 0:
                binned_means_first_neg_index = index
                break
        if binned_means_first_neg_index:
            binned_means_positive = binned_means[:binned_means_first_neg_index] 
            k_vals_positive = k_vals[:binned_means_first_neg_index]
        else:
            binned_means_positive = binned_means
            k_vals_positive = k_vals

        # Simple linear regression. Need 2 pi / N factors here.
        spectrum_lr = stats.linregress(np.log(1 / (k_vals_positive * (2 * np.pi / self._size))),
                                       np.log(np.abs(binned_means_positive)))
        
        def spectrum_lr_fit(k):
            return np.exp(spectrum_lr.intercept) / ((k * (2 * np.pi / self._size)) ** (spectrum_lr.slope))

        # Plotting.
        fig, axs = plt.subplots(2)
        x_fit = np.linspace(k_vals_positive[0], k_vals_positive[-1], num=100)
        y_fit = spectrum_lr_fit(x_fit)
        axs[1].loglog(x_fit,
                      y_fit,
                      linestyle='-',
                      color='r',
                      label='linear-regression')
        axs[0].plot(k_vals_positive, binned_means_positive)
        axs[0].set_xlabel('$k$')
        axs[0].set_ylabel('$P(k)$')
        axs[1].loglog(k_vals_positive, binned_means_positive)
        axs[1].set_xlabel('$k$')
        axs[1].set_ylabel('$P(k)$')
        axs[1].set_title(
            f'$P^{{fit}}(k)\\approx  {np.exp(spectrum_lr.intercept):.5f}\\cdot k^{{{-1*spectrum_lr.slope:.5f}}} \\quad P^{{theory}}(k)\\approx  {self.amplitude:.5f}\\cdot k^{{{-self.power:.5f}}} $'
        )
        fig.suptitle(
            f'Actual values: (amp,power)=({self.amplitude:.5f},{self.power:.5f})')
        plt.tight_layout()
        plt.legend()
        plt.show()

        # Transform back and take the real part to get the spectrum.
        self.realization = np.real(np.fft.ifft2(fourier_realization))

        # Imaginary parts are from numerical errors; they're very small.
        im_to_re_ratio = np.imag(np.fft.ifft2(fourier_realization)) / np.real(
            np.fft.ifft2(fourier_realization))

        print(
            'Sanity check: ratio of imaginary to real components in generated data:'
        )
        print(
            f'Average ratio: {np.mean(im_to_re_ratio)} Standard dev.: {np.std(im_to_re_ratio)}'
        )

    def spectrum_plot(self):
        """
        Plotting the spectrum.
        """
        if hasattr(self, 'spectrum'):
            fig, ax = plt.subplots()
            ax.imshow(self.realization,cmap='inferno')
            fig.suptitle('Realization')
            ax.set_xlabel('x')
            ax.set_ylabel('y')
            plt.show()
        else:
            print('Run generate to create spectrum first')

    def hist(self):
        """
        Histogram of generated values.
        """

        if not hasattr(self, 'spectrum'):
            return print('Run generate to create spectrum first')

        data = self.realization.flatten()
        std = np.std(data)
        self.std = std
        
        _, ax = plt.subplots()
        ax.hist(data, bins=100, density=True)
        ax.set_ylabel('counts')
        ax.set_xlabel('$\phi$')
        ax.set_title('Distribution of generated points')
        # plot fit
        x = np.linspace(-5 * std, 5 * std, 100)
        y = stats.norm.pdf(x, 0, std)
        ax.plot(x, y, label=f'Normal(0,{std**2:.5f})')
        plt.legend()
        plt.show()

    def bin_pair_data(self, min_len=1, max_len=None):
        """
        Collect all independent pairs of data points separated by less than max_len
        and bin them by distance, rounding to nearest integer. 
        Can also specify a minimum separation using min_len, taken as 1 by default
        
        This is a bottleneck, use progress counter, timer. 
        Don't think there's any native way to to this in numpy.
        The number of independent pairs for an N x N grid scales as O(N^4)
        which makes this take forever, hence the min_len and max_len params.        
        
        Plot various aspects of data
        """
        # Dynamically choosing max_len, if not specified.
        if max_len == None:
            max_len = int(self._size / 10)

        print('Collecting pair data...')

        # Put distance and field values in two separate arrays
        distances, fields = [], []
        
        start = time.time()
        prog = 20
        for row_1 in range(self._size):
            if (row_1 + 1) % int(self._size / 5) == 0:
                print(f'{prog}% complete')
                prog += 20
            for col_1 in range(self._size):
                for row_2 in range(row_1 + int(min_len / np.sqrt(2)), min(self._size, row_1 + max_len + 1)):
                    for col_2 in range(col_1 + int(min_len / np.sqrt(2)), min(self._size, col_1 + max_len + 1)):
                        dist = np.sqrt((row_1 - row_2) ** 2 + (col_1 - col_2) ** 2)
                        distances.append(dist)
                        fields.append(self.realization[row_1][col_1] * self.realization[row_2][col_2])
        fields = np.array(fields)
        distances = np.array(distances)
        print('Data collection time:', f'{time.time()-start:.5f}', 'seconds')
        print('Independent data pairs analyzed:', f'{len(distances):.3e}')
        

        x_bins = np.arange(min_len-.5, max_len+1, 1)
        x_vals = 0.5 * (x_bins[1:] + x_bins[:-1])
        binned_corr, _, _ = stats.binned_statistic(distances,
                                                   fields,
                                                   statistic='mean',
                                                   bins=x_bins)
        binned_corr_std, _, _ = stats.binned_statistic(distances,
                                                       fields,
                                                       statistic='std',
                                                       bins=x_bins)
        binned_corr_count, _, _ = stats.binned_statistic(distances,
                                                         fields,
                                                         statistic='count',
                                                         bins=x_bins)
        
        
        
        # Histograms of counts and stds
        fig, axs = plt.subplots(2)
        axs[0].bar(x_vals, binned_corr_count)
        axs[0].set_xlabel('x')
        axs[0].set_title('Points per bin')
        axs[1].scatter(x_vals, binned_corr_std)
        axs[1].set_xlabel('x')
        axs[1].set_ylabel('$\\sigma$')
        axs[1].set_title('Std. of $\\phi(x)\\phi(0)$ values per bin')
        plt.tight_layout()
        plt.show()
        
        """
        Simple linear regression.
        
        Some of the large x data can have negative correlations again,
        presumably due to low statistics. For the log fit, 
        we need to throw out this part of the data to avoid
        imaginary numbers, as before. Obviously a bit of a fudge.
        """
        
        binned_corr_first_neg_index = 0
        for index, num in enumerate(binned_corr):
            if num < 0:
                binned_corr_first_neg_index = index
                break
        if binned_corr_first_neg_index:
            binned_corr_positive = binned_corr[:binned_corr_first_neg_index] 
            binned_corr_std_positive = binned_corr_std[:binned_corr_first_neg_index] 
            binned_corr_count_positive = binned_corr_count[:binned_corr_first_neg_index] 
            x_vals_positive = x_vals[:binned_corr_first_neg_index] 
        else:
            binned_corr_positive = binned_corr
            binned_corr_std_positive = binned_corr_std
            binned_corr_count_positive = binned_corr_count
            x_vals_positive = x_vals
        print(binned_corr_positive)

        # Simple linear regression (treating all errors as equal).
        correlator_lr = stats.linregress(np.log(1 / x_vals_positive),
                                         np.log(binned_corr_positive))
        
        self.lr_fit = {
            '_x_amplitude': np.exp(correlator_lr.intercept),
            'power': self.dimensions - correlator_lr.slope
        }

    
        def correlator_lr_fit(x):
            return np.exp(correlator_lr.intercept) / (x**(correlator_lr.slope))

        
        fig, axs = plt.subplots(2)
        axs[0].errorbar(x_vals_positive,
                        binned_corr_positive,
                        yerr=binned_corr_std_positive / np.sqrt(binned_corr_count_positive),
                        marker='.',
                        color='k')
        axs[1].errorbar(x_vals_positive,
                        binned_corr_positive,
                        yerr=binned_corr_std_positive / np.sqrt(binned_corr_count_positive),
                        marker='.',
                        color='k')
        
        # Plotting the fit.
        x_fit = np.linspace(x_vals_positive[0], x_vals_positive[-1], num=100)
        y_fit = correlator_lr_fit(x_fit)
        axs[0].plot(x_fit,
                    y_fit,
                    linestyle='-',
                    color='r',
                    label='linear-regression')
        axs[1].loglog(x_fit,
                      y_fit,
                      linestyle='-',
                      color='r',
                      label='linear-regression')
        axs[0].plot(x_vals_positive, binned_corr_positive)
        axs[0].set_xlabel('$x$')
        axs[0].set_ylabel('$\\langle \\phi(x)\\phi(0)\\rangle\equiv G(x)$')
        axs[1].loglog(x_vals_positive, binned_corr_positive)
        axs[1].set_xlabel('$x$')
        axs[1].set_ylabel('$\\langle \\phi(x)\\phi(0)\\rangle\equiv G(x)$')
        
        # Plotting the theory.
        y_theory = self._theory_xspace((self.amplitude, self.power), x_fit)
        axs[0].plot(x_fit,
                    y_theory,
                    linestyle=':',
                    color='m',
                    label='theory')
        axs[1].loglog(x_fit,
                      y_theory,
                      linestyle=':',
                      color='m',
                      label='theory')
        
        

        x_amp = self._x_amplitude_numerator / self._x_amplitude_denominator
        axs[1].set_title(
            f'$G^{{fit}}(x)\\approx  {np.exp(correlator_lr.intercept):.5f}\\cdot x^{{{-1*correlator_lr.slope:.5f}}} \\quad G^{{theory}}(x)\\approx  {x_amp:.5f}\\cdot x^{{{self.power-self.dimensions:.5f}}} $'
        )
        fig.suptitle(
            f'Actual values: (amp,power)=({self.amplitude:.5f},{self.power:.5f})'
        )
        plt.tight_layout()
        plt.legend()
        plt.show()

        # Compute summary statistics: (x,y_bar,std(y_bar))
        self.data_summary_positive = np.array([[
            x_vals_positive[i], binned_corr_positive[i],
            binned_corr_std_positive[i] / np.sqrt(binned_corr_count_positive[i])
        ] for i in range(len(x_vals_positive))])
        

    def bayes(self, steps=10**4, walkers=2**6):
        """
        Bayesian analysis for power-spectrum parameters using binned data.
        """

        if not hasattr(self, 'data_summary_positive'):
            return print('Run data_fit first')

        # Set flat priors on the amplitude and power over some range covering actual values.
        amp_max, power_max = 3 * self.amplitude, 10

        def log_prior(params):
            amplitude, power = params
            if 0 < amplitude < amp_max and 0 < power < power_max:
                return 0.0
            return -np.inf

        def log_likelihood(params, data):
            x, y, sigy = data
            return np.sum(-np.log(sigy) - .5 *
                          (self._theory_xspace(params, x) - y)**2 / sigy**2)

        # Total log-prob needed for MCMC.
        def log_posterior(params, data):
            lp = log_prior(params)
            if not np.isfinite(lp):
                return -np.inf
            return lp + log_likelihood(params, data)

        # MCMC setup.
        # Distribute initial walker positions around position space LR best fits (so as not to cheat).
        initial = np.array([self.lr_fit['_x_amplitude'], self.lr_fit['power']])
        pos = initial.T + .1 * np.concatenate(
            (np.random.uniform(-initial[0], initial[0], (walkers, 1)),
             np.random.uniform(-initial[1], initial[1], (walkers, 1))),
            axis=1)
        walkers, dim = pos.shape

        sampler = emcee.EnsembleSampler(walkers,
                                        dim,
                                        log_posterior,
                                        args=[self.data_summary_positive.T])
        sampler.run_mcmc(pos, steps, progress=True)
        samples = sampler.get_chain()

        # Plotting chain convergence
        fig, axs = plt.subplots(2)
        for i in range(walkers):
            for j in range(dim):
                axs[j].plot(samples[:, i, j])

        axs[0].set_ylabel('amp')
        axs[1].set_ylabel('power')
        fig.suptitle('Chain convergence')
        plt.show()

        # Autocorrelation analysis.
        auto_corr = sampler.get_autocorr_time()
        thin_rate = int(np.mean(np.array(auto_corr)) / 2)

        # Burn 1/4 of data, then make corner plots.
        flat_samples = sampler.get_chain(discard=int(steps / 4),
                                         thin=thin_rate,
                                         flat=True)
        amp_samples, power_samples = flat_samples[:, 0], flat_samples[:, 1]

        fig = corner.corner(flat_samples,
                            labels=['amp', 'power'],
                            truths=[self.amplitude, self.power],
                            truth_color='r')
        fig.suptitle(
            f'Actual values: (amp,power)=({self.amplitude:.5f},{self.power:.5f})'
        )
        plt.show()

Run a realization and analyze.  Choose the amplitude and power randomly.

The momentum space correlator $P(k)=\frac{\texttt{amplitude}}{k^{\texttt{power}}}$ only unambiguously defines a position-space correlator $G(x)$ for $2>{\texttt{power}}>1/2$ (other values require regularization/analytic continuation; [relevant integral can be found here](https://dlmf.nist.gov/10.22#E43)).  The precise form of is $G(x)=\frac{{\rm amplitude}\,\Gamma[1-{\rm power}/2]}{2^{\rm power}x^{\rm 2-power}\,\Gamma[{\rm power}/2]}$.

Some parameter choices which run in reasonable time: `size_exponent`=8, `max_len`=15, `steps`=5000, `walkers`=64.

In [None]:
rand_amp= np.random.uniform(0, 10,1)[0]
rand_power = np.random.uniform(.75, 1.75, 1)[0]
size_exponent = 7
# Only using a subset of the data for speed
min_len, max_len = int(.1 * 2 ** size_exponent), int(.4 * 2 ** size_exponent)
print(f'amplitude: {rand_amp:.5f}',
      f'power: {rand_power:.5f}',
      f'size: {2 ** size_exponent}',
      f'min_len: {min_len}',
      f'max_len: {max_len}',
      sep='\n')


c = CmbPy(size_exponent=size_exponent, amplitude=rand_amp, power=rand_power)
c.generate()
c.spectrum_plot()
c.hist()
c.bin_pair_data(min_len=min_len, max_len=max_len)
c.bayes(steps=10**4, walkers=64)

amplitude: 0.44866
power: 1.74001
size: 128
min_len: 12
max_len: 51


Power 1.5 did poorly
Power 