diff --git a/docs/changes/780.feature.rst b/docs/changes/780.feature.rst new file mode 100644 index 000000000..29058279f --- /dev/null +++ b/docs/changes/780.feature.rst @@ -0,0 +1 @@ +Introduce power colors à la Heil et al. 2015 diff --git a/docs/index.rst b/docs/index.rst index 1e93d4298..a58f9c2df 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -24,15 +24,16 @@ Current Capabilities 1. Data handling and simulation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -* loading event lists from fits files of a few missions (RXTE/PCA, NuSTAR/FPM, XMM-Newton/EPIC, NICER/XTI) -* constructing light curves from event data, various operations on light curves (e.g. addition, subtraction, joining, and truncation) +* loading event lists from fits files (and generally good handling of OGIP-compliant missions, like RXTE/PCA, NuSTAR/FPM, XMM-Newton/EPIC, NICER/XTI) +* constructing light curves and time series from event data +* various operations on time series (e.g. addition, subtraction, joining, and truncation) * simulating a light curve with a given power spectrum * simulating a light curve from another light curve and a 1-d (time) or 2-d (time-energy) impulse response * simulating an event list from a given light curve _and_ with a given energy spectrum * Good Time Interval operations * Filling gaps in light curves with statistically sound fake data -2. Fourier methods +1. Fourier methods ~~~~~~~~~~~~~~~~~~ * power spectra and cross spectra in Leahy, rms normalization, absolute rms and no normalization * averaged power spectra and cross spectra @@ -45,6 +46,7 @@ Current Capabilities * bispectra; *needs testing* * (Bayesian) quasi-periodic oscillation searches * Lomb-Scargle periodograms and cross spectra +* Power Colors 3. Other time series methods ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -62,7 +64,6 @@ Other future additions we are currently implementing are: * bicoherence * phase-resolved spectroscopy of quasi-periodic oscillations * Fourier-frequency-resolved spectroscopy -* power colours * full HEASARC-compatible mission support * pulsar searches with :math:`H`-test * binary pulsar searches diff --git a/stingray/crossspectrum.py b/stingray/crossspectrum.py index d00cf83e6..e3e124a48 100644 --- a/stingray/crossspectrum.py +++ b/stingray/crossspectrum.py @@ -21,7 +21,7 @@ from .fourier import avg_cs_from_iterables, error_on_averaged_cross_spectrum from .fourier import avg_cs_from_events, poisson_level from .fourier import fftfreq, fft, normalize_periodograms, raw_coherence -from .fourier import get_flux_iterable_from_segments +from .fourier import get_flux_iterable_from_segments, power_color from scipy.special import factorial @@ -2028,6 +2028,9 @@ class DynamicalCrossspectrum(AveragedCrossspectrum): The time resolution of the dynamical spectrum. It is **not** the time resolution of the input light curve. It is the integration time of each line of the dynamical power spectrum (typically, an integer multiple of ``segment_size``). + + m: int + The number of averaged powers in each spectral bin (initially 1, it changes after rebinning). """ def __init__(self, data1, data2, segment_size, norm="frac", gti=None, sample_time=None): @@ -2075,16 +2078,21 @@ def _make_matrix(self, data1, data2): save_all=True, ) self.dyn_ps = np.array(avg.cs_all).T + conv = avg.cs_all / avg.unnorm_cs_all + self.unnorm_conversion = np.nanmean(conv) self.freq = avg.freq current_gti = avg.gti + self.nphots1 = avg.nphots1 + self.nphots2 = avg.nphots2 tstart, _ = time_intervals_from_gtis(current_gti, self.segment_size) self.time = tstart + 0.5 * self.segment_size self.df = avg.df self.dt = self.segment_size + self.m = 1 - def rebin_frequency(self, df_new, method="sum"): + def rebin_frequency(self, df_new, method="average"): """ Rebin the Dynamic Power Spectrum to a new frequency resolution. Rebinning is an in-place operation, i.e. will replace the existing @@ -2101,14 +2109,14 @@ def rebin_frequency(self, df_new, method="sum"): Must be larger than the frequency resolution of the old dynamical power spectrum! - method: {"sum" | "mean" | "average"}, optional, default "sum" - This keyword argument sets whether the counts in the new bins + method: {"sum" | "mean" | "average"}, optional, default "average" + This keyword argument sets whether the powers in the new bins should be summed or averaged. """ new_dynspec_object = copy.deepcopy(self) dynspec_new = [] for data in self.dyn_ps.T: - freq_new, bin_counts, bin_err, _ = rebin_data( + freq_new, bin_counts, bin_err, step = rebin_data( self.freq, data, dx_new=df_new, method=method ) dynspec_new.append(bin_counts) @@ -2116,9 +2124,11 @@ def rebin_frequency(self, df_new, method="sum"): new_dynspec_object.freq = freq_new new_dynspec_object.dyn_ps = np.array(dynspec_new).T new_dynspec_object.df = df_new + new_dynspec_object.m = int(step) * self.m + return new_dynspec_object - def rebin_time(self, dt_new, method="sum"): + def rebin_time(self, dt_new, method="average"): """ Rebin the Dynamic Power Spectrum to a new time resolution. @@ -2137,8 +2147,8 @@ def rebin_time(self, dt_new, method="sum"): Must be larger than the time resolution of the old dynamical power spectrum! - method: {"sum" | "mean" | "average"}, optional, default "sum" - This keyword argument sets whether the counts in the new bins + method: {"sum" | "mean" | "average"}, optional, default "average" + This keyword argument sets whether the powers in the new bins should be summed or averaged. Returns @@ -2156,7 +2166,7 @@ def rebin_time(self, dt_new, method="sum"): dynspec_new = [] for data in self.dyn_ps: - time_new, bin_counts, _, _ = rebin_data( + time_new, bin_counts, _, step = rebin_data( self.time, data, dt_new, method=method, dx=self.dt ) dynspec_new.append(bin_counts) @@ -2164,6 +2174,69 @@ def rebin_time(self, dt_new, method="sum"): new_dynspec_object.time = time_new new_dynspec_object.dyn_ps = np.array(dynspec_new) new_dynspec_object.dt = dt_new + new_dynspec_object.m = int(step) * self.m + return new_dynspec_object + + def rebin_by_n_intervals(self, n, method="average"): + """ + Rebin the Dynamic Power Spectrum to a new time resolution, by summing contiguous intervals. + + This is different from meth:`DynamicalPowerspectrum.rebin_time` in that it averages ``n`` + consecutive intervals regardless of their distance in time. ``rebin_time`` will instead + average intervals that are separated at most by a time ``dt_new``. + + Parameters + ---------- + n: int + The number of intervals to be combined into one. + + method: {"sum" | "mean" | "average"}, optional, default "average" + This keyword argument sets whether the powers in the new bins + should be summed or averaged. + + Returns + ------- + time_new: numpy.ndarray + Time axis with new rebinned time resolution. + + dynspec_new: numpy.ndarray + New rebinned Dynamical Cross Spectrum. + """ + if not np.issubdtype(type(n), np.integer): + warnings.warn("n must be an integer. Casting to int") + + n = int(n) + + if n == 1: + return copy.deepcopy(self) + if n < 1: + raise ValueError("n must be >= 1") + + new_dynspec_object = copy.deepcopy(self) + + dynspec_new = [] + time_new = [] + for i, data in enumerate(self.dyn_ps.T): + if i % n == 0: + count = 1 + bin_counts = data + bin_times = self.time[i] + else: + count += 1 + bin_counts += data + bin_times += self.time[i] + if count == n: + if method in ["mean", "average"]: + bin_counts /= n + dynspec_new.append(bin_counts) + bin_times /= n + time_new.append(bin_times) + + new_dynspec_object.time = time_new + new_dynspec_object.dyn_ps = np.array(dynspec_new).T + new_dynspec_object.dt *= n + new_dynspec_object.m = n * self.m + return new_dynspec_object def trace_maximum(self, min_freq=None, max_freq=None): @@ -2198,6 +2271,125 @@ def trace_maximum(self, min_freq=None, max_freq=None): return np.array(max_positions) + def power_colors( + self, + freq_edges=[1 / 256, 1 / 32, 0.25, 2.0, 16.0], + freqs_to_exclude=None, + poisson_power=0, + ): + """ + Compute the power colors of the dynamical power spectrum. + + See Heil et al. 2015, MNRAS, 448, 3348 + + Parameters + ---------- + freq_edges: iterable + The edges of the frequency bins to be used for the power colors. + + freqs_to_exclude : 1-d or 2-d iterable, optional, default None + The ranges of frequencies to exclude from the calculation of the power color. + For example, the frequencies containing strong QPOs. + A 1-d iterable should contain two values for the edges of a single range. (E.g. + ``[0.1, 0.2]``). ``[[0.1, 0.2], [3, 4]]`` will exclude the ranges 0.1-0.2 Hz and + 3-4 Hz. + + poisson_power : float or iterable, optional, default 0 + The Poisson noise level of the power spectrum. If iterable, it should have the same + length as ``freq``. (This might apply to the case of a power spectrum with a + strong dead time distortion) + + Returns + ------- + pc0: np.ndarray + pc0_err: np.ndarray + pc1: np.ndarray + pc1_err: np.ndarray + The power colors for each spectrum and their respective errors + """ + power_colors = [] + if np.iscomplexobj(self.dyn_ps): + warnings.warn("When using power_colors, complex powers will be cast to real.") + + for ps in self.dyn_ps.T: + power_colors.append( + power_color( + self.freq, + ps.real, + freq_edges=freq_edges, + freqs_to_exclude=freqs_to_exclude, + df=self.df, + poisson_power=poisson_power, + m=self.m, + ) + ) + + pc0, pc0_err, pc1, pc1_err = np.array(power_colors).T + return pc0, pc0_err, pc1, pc1_err + + def compute_rms(self, min_freq, max_freq, poisson_noise_level=0): + """ + Compute the fractional rms amplitude in the power spectrum + between two frequencies. + + Parameters + ---------- + min_freq: float + The lower frequency bound for the calculation. + + max_freq: float + The upper frequency bound for the calculation. + + Other parameters + ---------------- + poisson_noise_level : float, default is None + This is the Poisson noise level of the PDS with same + normalization as the PDS. + + Returns + ------- + rms: float + The fractional rms amplitude contained between ``min_freq`` and + ``max_freq``. + + rms_err: float + The error on the fractional rms amplitude. + + """ + from .fourier import rms_calculation + + dyn_ps_unnorm = self.dyn_ps / self.unnorm_conversion + poisson_noise_unnrom = poisson_noise_level / self.unnorm_conversion + if not hasattr(self, "nphots"): + nphots = (self.nphots1 * self.nphots2) ** 0.5 + else: + nphots = self.nphots + T = self.dt * self.m + M_freqs = self.m + K_freqs = 1 + minind = self.freq.searchsorted(min_freq) + maxind = self.freq.searchsorted(max_freq) + min_freq = self.freq[minind] + freq_bins = maxind - minind + rmss = [] + rms_errs = [] + for unnorm_powers in dyn_ps_unnorm.T: + r, re = rms_calculation( + unnorm_powers[minind:maxind], + min_freq, + max_freq, + nphots, + T, + M_freqs, + K_freqs, + freq_bins, + poisson_noise_unnrom, + deadtime=0.0, + ) + rmss.append(r) + rms_errs.append(re) + return rmss, rms_errs + def crossspectrum_from_time_array( times1, diff --git a/stingray/fourier.py b/stingray/fourier.py index 608fb9cb9..a39d1da5e 100644 --- a/stingray/fourier.py +++ b/stingray/fourier.py @@ -15,6 +15,309 @@ from .utils import fft, fftfreq, histogram, show_progress, sum_if_not_none_or_initialize +def integrate_power_in_frequency_range( + frequency, + power, + frequency_range, + power_err=None, + df=None, + m=1, + poisson_power=0, +): + """ + Integrate the power in a given frequency range. + + Parameters + ---------- + frequency : iterable + The frequencies of the power spectrum + power : iterable + The power at each frequency + frequency_range : iterable of length 2 + The frequency range to integrate + power_err : iterable, optional, default None + The power error bar at each frequency + df : float or float iterable, optional, default None + The frequency resolution of the input data. If None, it is calculated + from the median difference of input frequencies. + m : int, optional, default 1 + The number of segments and/or contiguous frequency bins averaged to obtain power. + Only needed if ``power_err`` is None + poisson_power : float, optional, default 0 + The Poisson noise level of the power spectrum. + + Returns + ------- + power_integrated : float + The integrated power + power_integrated_err : float + The error on the integrated power + + """ + frequency = np.array(frequency, dtype=float) + power = np.array(power) + if not np.iscomplexobj(power): + power = power.astype(float) + if not isinstance(poisson_power, Iterable): + poisson_power = np.ones_like(frequency) * poisson_power + if df is None: + df = np.median(np.diff(frequency)) + if not isinstance(df, Iterable): + df = np.ones_like(frequency) * df + + # The frequency_range is in the middle or at the edge of each bin. When only a part of the + # bin is included, we need to add the fraction of the bin that is included. + frequency_mask = (frequency + df / 2 > frequency_range[0]) & ( + frequency - df / 2 < frequency_range[1] + ) + + freqs_to_integrate = frequency[frequency_mask] + poisson_power = poisson_power[frequency_mask] + correction_ratios = np.ones_like(freqs_to_integrate) + dfs_to_integrate = df[frequency_mask] + + # The first and last bins are only partially included. We need to correct for the + # fraction of the bin actually included. + correction_ratios[0] = ( + freqs_to_integrate[0] + 0.5 * dfs_to_integrate[0] - frequency_range[0] + ) / dfs_to_integrate[0] + correction_ratios[-1] = ( + frequency_range[-1] - freqs_to_integrate[-1] + 0.5 * dfs_to_integrate[-1] + ) / dfs_to_integrate[-1] + dfs_to_integrate = dfs_to_integrate * correction_ratios + + powers_to_integrate = power[frequency_mask] + + if power_err is None: + power_err_to_integrate = powers_to_integrate / np.sqrt(m) + else: + power_err_to_integrate = np.asarray(power_err)[frequency_mask] + + power_integrated = np.sum((powers_to_integrate - poisson_power) * dfs_to_integrate) + power_err_integrated = np.sqrt(np.sum((power_err_to_integrate * dfs_to_integrate) ** 2)) + return power_integrated, power_err_integrated + + +def power_color( + frequency, + power, + power_err=None, + freq_edges=[1 / 256, 1 / 32, 0.25, 2.0, 16.0], + df=None, + m=1, + freqs_to_exclude=None, + poisson_power=0, + return_log=False, +): + """ + Calculate two power colors from a power spectrum. + + Power colors are an alternative to spectral colors to understand the spectral state of an + accreting source. They are defined as the ratio of the power in two frequency ranges, + analogously to the colors calculated from electromagnetic spectra. + + This function calculates two power colors, using the four frequency ranges contained + between the five frequency edges in ``freq_edges``. Given [f0, f1, f2, f3, f4], the + two power colors are calculated as the following ratios of the integrated power + (which are variances): + + + PC0 = Var([f0, f1]) / Var([f2, f3]) + + + PC1 = Var([f1, f2]) / Var([f3, f4]) + + Errors are calculated using simple error propagation from the integrated power errors. + + See Heil et al. 2015, MNRAS, 448, 3348 + + Parameters + ---------- + frequency : iterable + The frequencies of the power spectrum + power : iterable + The power at each frequency + + Other Parameters + ---------------- + power_err : iterable + The power error bar at each frequency + freq_edges : iterable, optional, default ``[0.0039, 0.031, 0.25, 2.0, 16.0]`` + The five edges defining the four frequency intervals to use to calculate the power color. + If empty, the power color is calculated using the frequencies from Heil et al. 2015. + df : float or float iterable, optional, default None + The frequency resolution of the input data. If None, it is calculated + from the median difference of input frequencies. + m : int, optional, default 1 + The number of segments and/or contiguous frequency bins averaged to obtain power + freqs_to_exclude : 1-d or 2-d iterable, optional, default None + The ranges of frequencies to exclude from the calculation of the power color. + For example, the frequencies containing strong QPOs. + A 1-d iterable should contain two values for the edges of a single range. (E.g. + ``[0.1, 0.2]``). ``[[0.1, 0.2], [3, 4]]`` will exclude the ranges 0.1-0.2 Hz and 3-4 Hz. + poisson_power : float or iterable, optional, default 0 + The Poisson noise level of the power spectrum. If iterable, it should have the same + length as ``frequency``. (This might apply to the case of a power spectrum with a + strong dead time distortion + return_log : bool, optional, default False + Return the base-10 logarithm of the power color and the errors + + Returns + ------- + PC0 : float + The first power color + PC0_err : float + The error on the first power color + PC1 : float + The second power color + PC1_err : float + The error on the second power color + """ + freq_edges = np.asarray(freq_edges) + if len(freq_edges) != 5: + raise ValueError("freq_edges must have 5 elements") + + frequency = np.asarray(frequency) + power = np.asarray(power) + + if df is None: + df = np.median(np.diff(frequency)) + input_frequency_low_edges = frequency - df / 2 + input_frequency_high_edges = frequency + df / 2 + + if freq_edges.min() < input_frequency_low_edges[0]: + raise ValueError("The minimum frequency is larger than the first frequency edge") + if freq_edges.max() > input_frequency_high_edges[-1]: + raise ValueError("The maximum frequency is lower than the last frequency edge") + + if power_err is None: + power_err = power / np.sqrt(m) + else: + power_err = np.asarray(power_err) + + if freqs_to_exclude is not None: + if len(np.shape(freqs_to_exclude)) == 1: + freqs_to_exclude = [freqs_to_exclude] + + if ( + not isinstance(freqs_to_exclude, Iterable) + or len(np.shape(freqs_to_exclude)) != 2 + or np.shape(freqs_to_exclude)[1] != 2 + ): + raise ValueError("freqs_to_exclude must be of format [[f0, f1], [f2, f3], ...]") + for f0, f1 in freqs_to_exclude: + frequency_mask = (input_frequency_low_edges > f0) & (input_frequency_high_edges < f1) + idx0, idx1 = np.searchsorted(frequency, [f0, f1]) + power[frequency_mask] = np.mean([power[idx0], power[idx1]]) + + var00, var00_err = integrate_power_in_frequency_range( + frequency, + power, + freq_edges[:2], + power_err=power_err, + df=df, + m=m, + poisson_power=poisson_power, + ) + var01, var01_err = integrate_power_in_frequency_range( + frequency, + power, + freq_edges[2:4], + power_err=power_err, + df=df, + m=m, + poisson_power=poisson_power, + ) + var10, var10_err = integrate_power_in_frequency_range( + frequency, + power, + freq_edges[1:3], + power_err=power_err, + df=df, + m=m, + poisson_power=poisson_power, + ) + var11, var11_err = integrate_power_in_frequency_range( + frequency, + power, + freq_edges[3:5], + power_err=power_err, + df=df, + m=m, + poisson_power=poisson_power, + ) + pc0 = var00 / var01 + pc1 = var10 / var11 + pc0_err = pc0 * (var00_err / var00 + var01_err / var01) + pc1_err = pc1 * (var10_err / var10 + var11_err / var11) + if return_log: + pc0_err = 1 / pc0 * pc0_err + pc1_err = 1 / pc1 * pc1_err + pc0 = np.log10(pc0) + pc1 = np.log10(pc1) + return pc0, pc0_err, pc1, pc1_err + + +def hue_from_power_color(pc0, pc1, center=[4.51920, 0.453724]): + """Measure the angle of a point in the log-power color diagram wrt the center. + + Angles are measured in radians, **in the clockwise direction**, with respect to a line oriented + at -45 degrees wrt the horizontal axis. + + See Heil et al. 2015, MNRAS, 448, 3348 + + Parameters + ---------- + pc0 : float + The (linear, not log!) power color in the first frequency range + pc1 : float + The (linear, not log!) power color in the second frequency range + + Other Parameters + ---------------- + center : iterable, optional, default [4.51920, 0.453724] + The coordinates of the center of the power color diagram + + Returns + ------- + hue : float + The angle of the point wrt the center, in radians + """ + pc0 = np.log10(pc0) + pc1 = np.log10(pc1) + + center = np.log10(np.asarray(center)) + + return hue_from_logpower_color(pc0, pc1, center=center) + + +def hue_from_logpower_color(log10pc0, log10pc1, center=(np.log10(4.51920), np.log10(0.453724))): + """Measure the angle of a point in the log-power color diagram wrt the center. + + Angles are measured in radians, **in the clockwise direction**, with respect to a line oriented + at -45 degrees wrt the horizontal axis. + + See Heil et al. 2015, MNRAS, 448, 3348 + + Parameters + ---------- + log10pc0 : float + The log10 power color in the first frequency range + log10pc1 : float + The log10 power color in the second frequency range + + Other Parameters + ---------------- + center : iterable, optional, default ``log10([4.51920, 0.453724])`` + The coordinates of the center of the power color diagram + + Returns + ------- + hue : float + The angle of the point wrt the center, in radians + """ + hue = 3 / 4 * np.pi - np.arctan2(log10pc1 - center[1], log10pc0 - center[0]) + return hue + + def positive_fft_bins(n_bin, include_zero=False): """ Give the range of positive frequencies of a complex FFT. @@ -1207,7 +1510,7 @@ def local_show_progress(a): # If the user wants to normalize using the mean of the total # lightcurve, normalize it here - cs_seg = unnorm_power + cs_seg = copy.deepcopy(unnorm_power) if not use_common_mean: mean = n_ph / n_bin @@ -1593,7 +1896,7 @@ def local_show_progress(a): unnorm_pd1 = unnorm_pd1[fgt0] unnorm_pd2 = unnorm_pd2[fgt0] - cs_seg = unnorm_power + cs_seg = copy.deepcopy(unnorm_power) p1_seg = unnorm_pd1 p2_seg = unnorm_pd2 diff --git a/stingray/powerspectrum.py b/stingray/powerspectrum.py index 82b92faa7..53495b239 100755 --- a/stingray/powerspectrum.py +++ b/stingray/powerspectrum.py @@ -210,7 +210,8 @@ def compute_rms( M_freq = self.m K_freq = self.k freq_bins = maxind - minind - T = self.dt * self.n + + T = self.dt * self.n * 2 if white_noise_offset is not None: powers = self.power[minind:maxind] @@ -237,6 +238,7 @@ def compute_rms( poisson_noise_unnorm = unnormalize_periodograms( poisson_noise_level, self.dt, self.n, self.nphots, norm=self.norm ) + return rms_calculation( self.unnorm_power[minind:maxind], min_freq, @@ -1013,6 +1015,9 @@ class DynamicalPowerspectrum(DynamicalCrossspectrum): The time resolution of the dynamical spectrum. It is **not** the time resolution of the input light curve. It is the integration time of each line of the dynamical power spectrum (typically, an integer multiple of ``segment_size``). + + m: int + The number of averaged cross spectra. """ def __init__(self, lc, segment_size, norm="frac", gti=None, sample_time=None): @@ -1055,6 +1060,8 @@ def _make_matrix(self, lc): gti=self.gti, save_all=True, ) + conv = avg.cs_all / avg.unnorm_cs_all + self.unnorm_conversion = np.nanmean(conv) self.dyn_ps = np.array(avg.cs_all).T self.freq = avg.freq current_gti = avg.gti @@ -1064,6 +1071,57 @@ def _make_matrix(self, lc): self.time = tstart + 0.5 * self.segment_size self.df = avg.df self.dt = self.segment_size + self.meanrate = avg.nphots / avg.n / avg.dt + self.nphots = avg.nphots + self.m = 1 + + def power_colors( + self, + freq_edges=[1 / 256, 1 / 32, 0.25, 2.0, 16.0], + freqs_to_exclude=None, + poisson_power=None, + ): + """ + Return the power colors of the dynamical power spectrum. + + Parameters + ---------- + freq_edges: iterable + The edges of the frequency bins to be used for the power colors. + + freqs_to_exclude : 1-d or 2-d iterable, optional, default None + The ranges of frequencies to exclude from the calculation of the power color. + For example, the frequencies containing strong QPOs. + A 1-d iterable should contain two values for the edges of a single range. (E.g. + ``[0.1, 0.2]``). ``[[0.1, 0.2], [3, 4]]`` will exclude the ranges 0.1-0.2 Hz and + 3-4 Hz. + + poisson_level : float or iterable, optional + Defaults to the theoretical Poisson noise level (e.g. 2 for Leahy normalization). + The Poisson noise level of the power spectrum. If iterable, it should have the same + length as ``frequency``. (This might apply to the case of a power spectrum with a + strong dead time distortion) + + Returns + ------- + pc0: np.ndarray + pc0_err: np.ndarray + pc1: np.ndarray + pc1_err: np.ndarray + The power colors for each spectrum and their respective errors + """ + if poisson_power is None: + poisson_power = poisson_level( + norm=self.norm, + meanrate=self.meanrate, + n_ph=self.nphots, + ) + + return super().power_colors( + freq_edges=freq_edges, + freqs_to_exclude=freqs_to_exclude, + poisson_power=poisson_power, + ) def powerspectrum_from_time_array( diff --git a/stingray/tests/test_crossspectrum.py b/stingray/tests/test_crossspectrum.py index 08ec13b86..5647a3d97 100644 --- a/stingray/tests/test_crossspectrum.py +++ b/stingray/tests/test_crossspectrum.py @@ -1408,6 +1408,30 @@ def test_works_with_events(self): dps_ev = DynamicalCrossspectrum(ev, ev, segment_size=10, sample_time=self.lc.dt) assert np.allclose(dps.dyn_ps, dps_ev.dyn_ps) + with pytest.warns(UserWarning, match="When using power_colors, complex "): + dps_ev.power_colors(freq_edges=[1 / 5, 1 / 2, 1, 2.0, 16.0]) + + def test_rms_is_correct(self): + lc = copy.deepcopy(self.lc) + lc.counts = np.random.poisson(lc.counts) + dps = DynamicalCrossspectrum(lc, lc, segment_size=10, norm="leahy") + rms, rmse = dps.compute_rms(1 / 5, 16.0, poisson_noise_level=2) + from stingray.powerspectrum import AveragedPowerspectrum + + ps = AveragedPowerspectrum() + ps.freq = dps.freq + ps.power = dps.dyn_ps.T[0] + ps.unnorm_power = ps.power / dps.unnorm_conversion + ps.df = dps.df + ps.m = dps.m + ps.n = dps.freq.size + ps.dt = lc.dt + ps.norm = dps.norm + ps.k = 1 + ps.nphots = (dps.nphots1 * dps.nphots2) ** 0.5 + rms2, rmse2 = ps.compute_rms(1 / 5, 16.0, poisson_noise_level=2) + assert np.isclose(rms[0], rms2) + assert np.isclose(rmse[0], rmse2, rtol=0.01) def test_works_with_events_and_its_complex(self): lc = copy.deepcopy(self.lc) @@ -1473,18 +1497,68 @@ def test_rebin_small_df(self): with pytest.raises(ValueError): dps.rebin_frequency(df_new=dps.df / 2.0) - def test_rebin_time_default_method(self): + def test_rebin_time_sum_method(self): + segment_size = 3 + dt_new = 6.0 + rebin_time = np.array([2.5, 8.5]) + rebin_dps = np.array([[1.73611111, 0.81018519]]) + dps = DynamicalCrossspectrum(self.lc_test, self.lc_test, segment_size=segment_size) + new_dps = dps.rebin_time(dt_new=dt_new, method="sum") + assert np.allclose(new_dps.time, rebin_time) + assert np.allclose(new_dps.dyn_ps, rebin_dps) + assert np.isclose(new_dps.dt, dt_new) + + def test_rebin_n(self): + segment_size = 3 + dt_new = 6.0 + rebin_time = np.array([2.5, 8.5]) + rebin_dps = np.array([[1.73611111, 0.81018519]]) + dps = DynamicalCrossspectrum(self.lc_test, self.lc_test, segment_size=segment_size) + new_dps = dps.rebin_by_n_intervals(n=2, method="sum") + assert np.allclose(new_dps.time, rebin_time) + assert np.allclose(new_dps.dyn_ps, rebin_dps) + assert np.isclose(new_dps.dt, dt_new) + + def test_rebin_n_average(self): + segment_size = 3 + dt_new = 6.0 + rebin_time = np.array([2.5, 8.5]) + rebin_dps = np.array([[1.73611111, 0.81018519]]) + dps = DynamicalCrossspectrum(self.lc_test, self.lc_test, segment_size=segment_size) + new_dps = dps.rebin_by_n_intervals(n=2, method="average") + assert np.allclose(new_dps.time, rebin_time) + assert np.allclose(new_dps.dyn_ps, rebin_dps / 2) + assert np.isclose(new_dps.dt, dt_new) + + def test_rebin_n_warns_for_non_integer(self): segment_size = 3 dt_new = 6.0 rebin_time = np.array([2.5, 8.5]) rebin_dps = np.array([[1.73611111, 0.81018519]]) dps = DynamicalCrossspectrum(self.lc_test, self.lc_test, segment_size=segment_size) - new_dps = dps.rebin_time(dt_new=dt_new) + with pytest.warns(UserWarning, match="n must be an integer. Casting to int"): + new_dps = dps.rebin_by_n_intervals(n=2.1, method="sum") assert np.allclose(new_dps.time, rebin_time) assert np.allclose(new_dps.dyn_ps, rebin_dps) assert np.isclose(new_dps.dt, dt_new) - def test_rebin_frequency_default_method(self): + def test_rebin_n_fails_for_n_lt_1(self): + segment_size = 3 + + dps = DynamicalCrossspectrum(self.lc_test, self.lc_test, segment_size=segment_size) + with pytest.raises(ValueError, match="n must be >= 1"): + _ = dps.rebin_by_n_intervals(n=0) + dps2 = dps.rebin_by_n_intervals(n=1) + assert np.allclose(dps2.dyn_ps, dps.dyn_ps) + + def test_rebin_n_copies_for_n_1(self): + segment_size = 3 + + dps = DynamicalCrossspectrum(self.lc_test, self.lc_test, segment_size=segment_size) + dps2 = dps.rebin_by_n_intervals(n=1) + assert np.allclose(dps2.dyn_ps, dps.dyn_ps) + + def test_rebin_frequency_sum_method(self): segment_size = 50 df_new = 10.0 rebin_freq = np.array([5.01, 15.01, 25.01, 35.01]) @@ -1501,7 +1575,7 @@ def test_rebin_frequency_default_method(self): with warnings.catch_warnings(): warnings.simplefilter("ignore", category=UserWarning) dps = DynamicalCrossspectrum(self.lc, self.lc, segment_size=segment_size) - new_dps = dps.rebin_frequency(df_new=df_new) + new_dps = dps.rebin_frequency(df_new=df_new, method="sum") assert np.allclose(new_dps.freq, rebin_freq) assert np.allclose(new_dps.dyn_ps, rebin_dps, atol=0.01) assert np.isclose(new_dps.df, df_new) diff --git a/stingray/tests/test_fourier.py b/stingray/tests/test_fourier.py index 216ee49b0..91b8df6dc 100644 --- a/stingray/tests/test_fourier.py +++ b/stingray/tests/test_fourier.py @@ -293,7 +293,19 @@ def test_avg_cs_use_common_mean_similar_stats(self, norm): silent=True, return_subcs=True, ) + assert np.isclose(out_comm["power"].std(), out["power"].std(), rtol=0.1) + assert np.isclose(out_comm["unnorm_power"].std(), out["unnorm_power"].std(), rtol=0.1) + # Run the same check on the subcs + assert np.isclose(out_comm.meta["subcs"].std(), out.meta["subcs"].std(), rtol=0.1) + assert np.isclose( + out_comm.meta["unnorm_subcs"].std(), out.meta["unnorm_subcs"].std(), rtol=0.1 + ) + # Now verify that the normalizations are consistent between single power and subcs + assert np.isclose( + out_comm["unnorm_power"].std() / out_comm["power"].std(), + out_comm.meta["unnorm_subcs"].std() / out_comm.meta["subcs"].std(), + ) @pytest.mark.parametrize("use_common_mean", [True, False]) @pytest.mark.parametrize("norm", ["frac", "abs", "none", "leahy"]) @@ -663,3 +675,123 @@ def test_impose_symmetry_lsft(): assert np.all((imp_sym_fast.real) == np.flip(imp_sym_fast.real)) assert np.all((imp_sym_fast.imag) == (-np.flip(imp_sym_fast.imag))) assert np.all(freqs_new_slow == freqs_new_fast) + + +class TestIntegration(object): + @classmethod + def setup_class(cls): + cls.freq = [0, 1, 2, 3] + cls.power = [2, 2, 2, 2] + cls.power_err = [1, 1, 1, 1] + + def test_power_integration_middle_bin(self): + freq_range = [1, 2] + pow, powe = integrate_power_in_frequency_range(self.freq, self.power, freq_range) + assert np.isclose(pow, 2) + assert np.isclose(powe, np.sqrt(2)) + + def test_power_integration_precise(self): + freq_range = [0.5, 2.5] + df = 1 + pow, powe = integrate_power_in_frequency_range(self.freq, self.power, freq_range, df=df) + assert np.allclose(pow, 4) + assert np.allclose(powe, 2 * np.sqrt(2)) + + def test_power_integration_poisson(self): + freq_range = [0.5, 2.5] + for poisson_power in (1, np.ones_like(self.power)): + pow, powe = integrate_power_in_frequency_range( + self.freq, self.power, freq_range, poisson_power=poisson_power + ) + assert np.allclose(pow, 2) + assert np.allclose(powe, 2 * np.sqrt(2)) + + def test_power_integration_err(self): + freq_range = [0.5, 2.5] + pow, powe = integrate_power_in_frequency_range( + self.freq, self.power, freq_range, power_err=self.power_err + ) + assert np.allclose(pow, 4) + assert np.allclose(powe, np.sqrt(2)) + + def test_power_integration_m(self): + freq_range = [0.5, 2.5] + pow, powe = integrate_power_in_frequency_range(self.freq, self.power, freq_range, m=4) + assert np.allclose(pow, 4) + assert np.allclose(powe, np.sqrt(2)) + + +class TestPowerColor(object): + @classmethod + def setup_class(cls): + cls.freq = np.arange(0.0001, 17, 0.00001) + cls.power = 1 / cls.freq + + def test_power_color(self): + pc0, _, pc1, _ = power_color(self.freq, self.power) + # The colors calculated with these frequency edges on a 1/f spectrum should be 1 + assert np.isclose(pc0, 1) + assert np.isclose(pc1, 1) + + def test_return_log(self): + pc0, _, pc1, _ = power_color(self.freq, self.power, return_log=True) + # The colors calculated with these frequency edges on a 1/f spectrum should be 1 + assert np.isclose(pc0, 0, atol=0.001) + assert np.isclose(pc1, 0, atol=0.001) + + def test_bad_edges(self): + good = self.freq > 1 / 255 # the smallest frequency is 1/256 + with pytest.raises(ValueError, match="The minimum frequency is larger "): + power_color(self.freq[good], self.power[good]) + + good = self.freq < 15 # the smallest frequency is 1/256 + with pytest.raises(ValueError, match="The maximum frequency is lower "): + power_color(self.freq[good], self.power[good]) + + with pytest.raises(ValueError, match="freq_edges must have 5 elements"): + power_color(self.freq, self.power, freq_edges=[1]) + with pytest.raises(ValueError, match="freq_edges must have 5 elements"): + power_color(self.freq, self.power, freq_edges=[1, 2, 3, 4, 5, 6]) + + def test_bad_excluded_interval(self): + for fte in ([1, 1.1, 3.0], [4], [[1, 1.1, 3.0]], 0, [[[1, 3]]]): + with pytest.raises(ValueError, match="freqs_to_exclude must be of "): + power_color(self.freq, self.power, freqs_to_exclude=fte) + + def test_excluded_frequencies(self): + pc0, _, pc1, _ = power_color(self.freq, self.power, freqs_to_exclude=[1, 1.1]) + # The colors calculated with these frequency edges on a 1/f spectrum should be 1 + # The excluded frequency interval is small enough that the approximation should work + assert np.isclose(pc0, 1, atol=0.001) + assert np.isclose(pc1, 1, atol=0.001) + + def test_with_power_err(self): + pc0, pc0_err, pc1, pc1_err = power_color( + self.freq, + self.power, + power_err=self.power / 2, + ) + pc0e, pc0e_err, pc1e, pc1e_err = power_color( + self.freq, + self.power, + power_err=self.power, + ) + assert np.isclose(pc0, 1, atol=0.001) + assert np.isclose(pc1, 1, atol=0.001) + assert np.isclose(pc0e, 1, atol=0.001) + assert np.isclose(pc1e, 1, atol=0.001) + assert np.isclose(pc0e_err / pc0_err, 2, atol=0.001) + assert np.isclose(pc1e_err / pc1_err, 2, atol=0.001) + + def test_hue(self): + center = (4.51920, 0.453724) + log_center = np.log10(np.asarray(center)) + for angle in np.radians(np.arange(0, 380, 20)): + factor = np.random.uniform(0.1, 10) + x = factor * np.cos(3 / 4 * np.pi - angle) + log_center[0] + y = factor * np.sin(3 / 4 * np.pi - angle) + log_center[1] + hue = hue_from_power_color(10**x, 10**y, center) + # Compare the angles in a safe way + c2 = (np.sin(hue) - np.sin(angle)) ** 2 + (np.cos(hue) - np.cos(angle)) ** 2 + angle_diff = np.arccos((2.0 - c2) / 2.0) + assert np.isclose(angle_diff, 0, atol=0.001) diff --git a/stingray/tests/test_powerspectrum.py b/stingray/tests/test_powerspectrum.py index 45561fa3f..a43d4c20d 100644 --- a/stingray/tests/test_powerspectrum.py +++ b/stingray/tests/test_powerspectrum.py @@ -1034,6 +1034,29 @@ def test_works_with_events(self): dps_ev = DynamicalPowerspectrum(ev, segment_size=10, sample_time=self.lc.dt) assert np.allclose(dps.dyn_ps, dps_ev.dyn_ps) + dps_ev.power_colors(freq_edges=[1 / 5, 1 / 2, 1, 2.0, 16.0]) + + def test_rms_is_correct(self): + lc = copy.deepcopy(self.lc) + lc.counts = np.random.poisson(lc.counts) + dps = DynamicalPowerspectrum(lc, segment_size=10, norm="leahy") + rms, rmse = dps.compute_rms(1 / 5, 16.0, poisson_noise_level=2) + from stingray.powerspectrum import AveragedPowerspectrum + + ps = AveragedPowerspectrum() + ps.freq = dps.freq + ps.power = dps.dyn_ps.T[0] + ps.unnorm_power = ps.power / dps.unnorm_conversion + ps.df = dps.df + ps.m = dps.m + ps.n = dps.freq.size + ps.dt = lc.dt + ps.norm = dps.norm + ps.k = 1 + ps.nphots = dps.nphots + rms2, rmse2 = ps.compute_rms(1 / 5, 16.0, poisson_noise_level=2) + assert np.isclose(rms[0], rms2) + assert np.isclose(rmse[0], rmse2, rtol=0.01) def test_with_long_seg_size(self): with pytest.raises(ValueError): @@ -1087,13 +1110,13 @@ def test_rebin_small_df(self): with pytest.raises(ValueError): dps.rebin_frequency(df_new=dps.df / 2.0) - def test_rebin_time_default_method(self): + def test_rebin_time_sum_method(self): segment_size = 3 dt_new = 6.0 rebin_time = np.array([2.5, 8.5]) rebin_dps = np.array([[1.73611111, 0.81018519]]) dps = DynamicalPowerspectrum(self.lc_test, segment_size=segment_size) - new_dps = dps.rebin_time(dt_new=dt_new) + new_dps = dps.rebin_time(dt_new=dt_new, method="sum") assert np.allclose(new_dps.time, rebin_time) assert np.allclose(new_dps.dyn_ps, rebin_dps) assert np.isclose(new_dps.dt, dt_new) @@ -1115,7 +1138,7 @@ def test_rebin_frequency_default_method(self): with warnings.catch_warnings(): warnings.simplefilter("ignore", category=UserWarning) dps = DynamicalPowerspectrum(self.lc, segment_size=segment_size) - new_dps = dps.rebin_frequency(df_new=df_new) + new_dps = dps.rebin_frequency(df_new=df_new, method="sum") assert np.allclose(new_dps.freq, rebin_freq) assert np.allclose(new_dps.dyn_ps, rebin_dps, atol=0.01) assert np.isclose(new_dps.df, df_new)