# Setup

## Import necessary packages

In [None]:
!pip install h5py
!pip install astropy
!pip install scipy
!pip install pycwt
!pip install pandas

In [1]:
import h5py
import pycwt as wavelet
import numpy as np
import scipy
import matplotlib.pyplot as plt
import astropy.units as unit
import astropy.constants as cons
from astropy.time import Time 
from datetime import datetime, timedelta

import matplotlib.animation as animation
from scipy.stats import pearsonr
import pandas as pd
from pathlib import Path

filePath = "../data/PlanetaryEphemerides.h5"

solar_mass = 1.989e30 # kg

__ANGLE_LIMIT_LOWER__ = 86
__ANGLE_LIMIT_HIGHER__ = 94

import warnings
warnings.filterwarnings('ignore', category=UserWarning, module='erfa')

class FancyColors:
    TEST = "\033[1;4;93;41m"
    RESET = "\033[0m"
    IMPORTANT = "\033[1;4;96;41m"

    RED = "\033[31m"
    GREEN = "\033[32m"
    YELLOW = "\033[33m"
    BLUE = "\033[34m"
    MAGENTA = "\033[35m"
    CYAN = "\033[36m"


## Define statistics

In [2]:
def circ_lin_r(angle, y):
    angle = np.asarray(angle)
    y = np.asarray(y)
    x = (y - y.mean()) / y.std()
    c = np.cos(angle)
    s = np.sin(angle)
    r_xc = pearsonr(x, c)[0]
    r_xs = pearsonr(x, s)[0]
    r_cs = pearsonr(c, s)[0]
    return np.sqrt((r_xc**2 + r_xs**2 - 2*r_xc*r_xs*r_cs) / (1 - r_cs**2 + 1e-15)), x, c, s, r_cs

def sweep_lag_circ_lin(
    t_angle, angle, t_y, y, 
    lags,                      # iterable of lags in same units as t_* (e.g., days)
    L=30,                      # block length for perms (≈ autocorr time)
    n_perm=2000
):
    # Make DataFrames to inner-join on time after lagging
    da = pd.DataFrame({"t": t_angle, "angle": angle})
    dy0 = pd.DataFrame({"t": t_y, "y": y}).sort_values("t").reset_index(drop=True)

    results = []
    for lag in lags:
        # shift y in time by 'lag': y(t+lag)
        dy = dy0.copy()
        dy["t"] = dy["t"] + lag
        merged = pd.merge(da, dy, on="t", how="inner")
        if len(merged) < 10:
            results.append({"lag": lag, "r": np.nan})
            continue
        r,_,_,_,_ = circ_lin_r(merged["angle"].values, merged["y"].values)
        results.append({"lag": lag, "r": r})
    res = pd.DataFrame(results)
    if res["r"].notna().sum() == 0:
        return {"table": res, "best_lag": np.nan, "best_r": np.nan, "p_fwe": np.nan}

    # Observed max statistic across lags
    best_idx = res["r"].idxmax()
    obs_max_r = res.loc[best_idx, "r"]

    # --- max-T block-permutation (family-wise error across lags) ---
    # Build block indices on the original (unshifted) y to preserve autocorr
    W = len(dy0)
    idx = np.arange(W)
    blocks = [idx[i:i+L] for i in range(0, W, L)]

    max_r_perm = []
    y_arr = dy0["y"].values
    for _ in range(n_perm):
        order = np.random.permutation(len(blocks))
        pidx = np.concatenate([blocks[i] for i in order])
        dy_p = dy0.copy()
        dy_p["y"] = y_arr[pidx]

        max_r_this_perm = -np.inf
        for lag in lags:
            dyp = dy_p.copy()
            dyp["t"] = dyp["t"] + lag
            merged = pd.merge(da, dyp, on="t", how="inner")
            if len(merged) < 10:
                continue
            r_p,_,_,_,_ = circ_lin_r(merged["angle"].values, merged["y"].values)
            if r_p > max_r_this_perm:
                max_r_this_perm = r_p
        if np.isfinite(max_r_this_perm):
            max_r_perm.append(max_r_this_perm)

    # Family-wise p-value: fraction of permuted max-r >= observed max-r
    if len(max_r_perm) == 0:
        p_fwe = np.nan
    else:
        max_r_perm = np.asarray(max_r_perm)
        p_fwe = (np.sum(max_r_perm >= obs_max_r) + 1) / (len(max_r_perm) + 1)

    return {
        "table": res.sort_values("lag").reset_index(drop=True),
        "best_lag": res.loc[best_idx, "lag"],
        "best_r": obs_max_r,
        "p_fwe": p_fwe,  # corrected for multiple lags
    }

def circ_lin_corr(angle, y):

    r, x, c, s, r_cs = circ_lin_r(angle, y)

    # --- Block permutation p-value (choose block length L ~ dominant autocorr time) ---
    def block_perm_corr(x, c, s, r_cs, L=30, n_perm=5000):
        W = len(x)
        idx = np.arange(W)
        blocks = [idx[i:i+L] for i in range(0, W, L)]
        obs = r
        cnt = 0
        for _ in range(n_perm):
            # shuffle blocks
            order = np.random.permutation(len(blocks))
            pidx = np.concatenate([blocks[i] for i in order])
            xp = x[pidx]
            r_xc_p = pearsonr(xp, c)[0]
            r_xs_p = pearsonr(xp, s)[0]
            r_p = np.sqrt((r_xc_p**2 + r_xs_p**2 - 2*r_xc_p*r_xs_p*r_cs) / (1 - r_cs**2))
            cnt += (r_p >= obs)
        return (cnt+1)/(n_perm+1)

    p_vals = []
    for L in [10,20,30,40,60,80,120]:
        p_vals.append(block_perm_corr(x, c, s, r_cs, L=30, n_perm=2000))
    return {"pearsonr":r, "pscore":np.mean(p_vals)}

def wavelet_cross_analysis(x, y, dt, mother=wavelet.Morlet(6), dj=1/12, s0=None, J=None):
    """
    Compute individual CWTs, cross-wavelet transform (XWT),
    coherence (WCT), phase, and time delay using native pycwt.
    """
    x = np.asarray(x)
    y = np.asarray(y)

    # --- Define scales ---
    if s0 is None:
        s0 = 2 * dt
    if J is None:
        J = int(np.log2(len(x) * dt / s0) / dj)

    # --- Individual continuous wavelet transforms ---
    Wx, scales, freqs, coi_x, _, _ = wavelet.cwt(x, dt, dj, s0, J, mother)
    Wy, _, _, coi_y, _, _ = wavelet.cwt(y, dt, dj, s0, J, mother)
    period = 1 / freqs
    coi = np.minimum(coi_x, coi_y)  # conservative COI

    # --- Cross-wavelet transform (XWT) ---
    Wxy = Wx * np.conj(Wy)
    xwt_power = np.abs(Wxy)

    # --- Wavelet coherence (WCT) ---
    WCT, aWCT, coi_wct, freq_wct, sig = wavelet.wct(x, y, dt, dj=dj, s0=s0, J=J, mother=mother, sig = False) # Disable monte-carlo simulation with sig=False

    # --- Phase and delay ---
    phase = np.angle(aWCT)
    delay = (phase / (2 * np.pi)) * (1 / freq_wct)[:, None]

    return {
        "period": period,
        "freq": freqs,
        "Wx": Wx,
        "Wy": Wy,
        "Wxy": Wxy,
        "xwt": xwt_power,
        "wtc": WCT,
        "aWCT": aWCT,
        "phase": phase,
        "delay": delay,
        "sig": sig,
        "coi": coi,
        "coi_wct": coi_wct,
    }

def variance_ratio_scan(array, interval1_size: int = 10, interval2_size: int = 5, inverse: bool = False):
    """
    Computes running variance ratios across an array with circular wrapping.
    
    Parameters:
    -----------
    array : numpy array
        The input data array
    interval_size : int
        The size of each interval for variance computation
    
    Returns:
    --------
    ratios : numpy array
        Array of variance ratios (variance of second interval / variance of first interval)
    """
    n = len(array)
    ratios = np.zeros(n)

    for i in range(n):
        # Define first interval starting at index i
        interval1_indices = np.arange(i, i + interval1_size) % n
        interval1 = array[interval1_indices]

        # Define second interval starting at index i + interval1_size
        interval2_indices = np.arange(i + interval1_size, i + interval1_size + interval2_size) % n
        interval2 = array[interval2_indices]

        # Compute variances
        var1 = np.var(interval1)
        var2 = np.var(interval2)

        # Compute ratio (handle division by zero)
        if not inverse:
            if var1 != 0:
                ratios[i] = var2 / var1
            else:
                ratios[i] = np.nan  # or np.inf, depending on your preference
        else:
            if var2 != 0 :
                ratios[i] = var1 / var2
            else:
                ratios[i] = np.nan

    return ratios

def angle_window_variance_ratio(time_array, angle_array, data_array, 
                                  angle_min, angle_max, pre_steps, inverse: bool = False):
    """
    Computes variance ratios for data during angle windows vs pre-window periods.
    """
    # Find indices where angle is within the specified range
    in_window = (angle_array >= angle_min) & (angle_array <= angle_max)

    # Find contiguous windows
    windows = []
    start_idx = None

    for i in range(len(angle_array)):
        if in_window[i] and start_idx is None:
            # Start of a new window
            start_idx = i
        elif not in_window[i] and start_idx is not None:
            # End of a window
            windows.append((start_idx, i - 1))
            start_idx = None

    # Handle case where window extends to end of array
    if start_idx is not None:
        windows.append((start_idx, len(angle_array) - 1))

    # Compute variance ratios for each window
    window_info = []
    pre_data = None

    for start, end in windows:
        # Variance during the window
        window_data = data_array[start:end+1]
        var_during = np.var(window_data)

        # Variance before the window (pre_steps timesteps before start)
        pre_start = start - pre_steps
        pre_end = start - 1

        if pre_start >= 0:
            pre_data = data_array[pre_start:pre_end+1]
            var_before = np.var(pre_data)

            # Compute ratio
            if not inverse:
                if var_before != 0:
                    ratio = var_during / var_before
                elif (var_during == 0) & (var_before == 0):
                    ratio = 0
                else:
                    ratio = np.nan
            else:
                if var_during != 0:
                    ratio = var_before / var_during
                elif (var_during == 0 ) & (var_before == 0):
                    ratio = 0
                else:
                    ratio = np.nan
        else:
            # Not enough data before window
            ratio = np.nan

        window_info.append({
            'start_idx': start,
            'end_idx': end,
            'ratio': ratio,
            'start_time': time_array[start],
            'end_time': time_array[end],
            'var_during': var_during,
            'var_before': var_before if pre_start >= 0 else np.nan,
            'window_data': window_data,
            'pre_data': pre_data if len(windows)>0 else None
        })

    return window_info

def angle_window_variance_ratio_post(time_array, angle_array, data_array, 
                                    angle_min, angle_max, post_steps, inverse: bool = False):
    """
    Computes variance ratios for data during angle windows vs post-window periods.
    """
    # Find indices where angle is within the specified range
    in_window = (angle_array >= angle_min) & (angle_array <= angle_max)

    # Find contiguous windows
    windows = []
    start_idx = None

    for i in range(len(angle_array)):
        if in_window[i] and start_idx is None:
            # Start of a new window
            start_idx = i
        elif not in_window[i] and start_idx is not None:
            # End of a window
            windows.append((start_idx, i - 1))
            start_idx = None

    # Handle case where window extends to end of array
    if start_idx is not None:
        windows.append((start_idx, len(angle_array) - 1))

    # Compute variance ratios for each window
    window_info = []
    post_data = None

    for start, end in windows:
        # Variance during the window
        window_data = data_array[start:end+1]
        var_during = np.var(window_data)

        # Variance after the window (post_steps timesteps after end)
        post_start = end + 1
        post_end = end + post_steps

        if post_end < len(data_array):
            post_data = data_array[post_start:post_end+1]
            var_after = np.var(post_data)

            # Compute ratio
            if not inverse:
                if var_after != 0:
                    ratio = var_during / var_after
                elif (var_during == 0) & (var_after == 0):
                    ratio = 0
                else:
                    ratio = np.nan
            else:
                if var_during != 0:
                    ratio = var_after / var_during
                elif (var_during == 0) & (var_after == 0):
                    ratio = 0
                else:
                    ratio = np.nan
        else:
            # Not enough data after window
            ratio = np.nan
            post_data = np.array([])
            var_after = np.nan

        window_info.append({
            'start_idx': start,
            'end_idx': end,
            'ratio': ratio,
            'start_time': time_array[start],
            'end_time': time_array[end],
            'var_during': var_during,
            'var_after': var_after,
            'window_data': window_data,
            'post_data': post_data
        })

    return window_info


def angle_window_post_event_difference(time_array, angle_array, data_array,
                                       angle_min, angle_max, 
                                       weeks_after, 
                                       n_points_at_end=1,
                                       n_points_after=1):
    """
    Computes the difference between data mean at event end vs X weeks after event end.
    
    Parameters:
    -----------
    time_array : numpy array
        Time series in Julian dates
    angle_array : numpy array
        Angle values corresponding to time_array
    data_array : numpy array
        Data values corresponding to time_array
    angle_min : float
        Minimum angle defining the event window (default 86 degrees)
    angle_max : float
        Maximum angle defining the event window (default 94 degrees)
    weeks_after : int
        Number of weeks after event end to compare
    n_points_at_end : int
        Number of points to average around event end (1 = single point)
    n_points_after : int
        Number of points to average around X-weeks-after point
        
    Returns:
    --------
    event_info : list of dicts
        Each dict contains:
        - end_idx: Index where event ends
        - end_time: Time when event ends
        - mean_at_end: Mean of data at event end
        - mean_after: Mean of data X weeks after
        - difference: mean_after - mean_at_end
        - std_at_end: Standard deviation at event end
        - std_after: Standard deviation X weeks after
        - error: Propagated error of the difference
        - data_at_end: Data points used for mean at end
        - data_after: Data points used for mean after
    """
    # Find indices where angle is within the specified range
    in_window = (angle_array >= angle_min) & (angle_array <= angle_max)
    
    # Find contiguous windows
    windows = []
    start_idx = None
    
    for i in range(len(angle_array)):
        if in_window[i] and start_idx is None:
            start_idx = i
        elif not in_window[i] and start_idx is not None:
            windows.append((start_idx, i - 1))
            start_idx = None
    
    if start_idx is not None:
        windows.append((start_idx, len(angle_array) - 1))
    
    event_info = []
    
    for start, end in windows:
        end_idx = end
        
        # Compute indices for averaging at event end
        if n_points_at_end == 1:
            idx_at_end = [end_idx]
        else:
            half_window = n_points_at_end // 2
            idx_at_end = list(range(end_idx - half_window, end_idx + half_window + 1))
            idx_at_end = [i for i in idx_at_end if 0 <= i < len(data_array)]
        
        # Compute mean and std at event end
        data_at_end = data_array[idx_at_end]
        mean_at_end = np.nanmean(data_at_end)
        std_at_end = np.nanstd(data_at_end)
        
        # Find index X weeks after event end
        # Assuming daily data: weeks_after * 7 days
        days_after = weeks_after * 7
        after_idx = end_idx + days_after
        
        if after_idx >= len(data_array):
            # Not enough data after event
            event_info.append({
                'end_idx': end_idx,
                'after_idx': after_idx,
                'end_time': time_array[end_idx],
                'mean_at_end': mean_at_end,
                'mean_after': np.nan,
                'difference': np.nan,
                'std_at_end': std_at_end,
                'std_after': np.nan,
                'error': np.nan,
                'data_at_end': data_at_end,
                'data_after': np.array([])
            })
            continue
        
        # Compute indices for averaging X weeks after
        if n_points_after == 1:
            idx_after = [after_idx]
        else:
            half_window = n_points_after // 2
            idx_after = list(range(after_idx - half_window, after_idx + half_window + 1))
            idx_after = [i for i in idx_after if 0 <= i < len(data_array)]
        
        # Compute mean and std X weeks after
        data_after = data_array[idx_after]
        mean_after = np.nanmean(data_after)
        std_after = np.nanstd(data_after)
        
        # Compute difference
        difference = mean_after - mean_at_end
        
        # Propagate error (assuming independent measurements)
        n_at_end = len(data_at_end)
        n_after = len(data_after)
        error_at_end = std_at_end / np.sqrt(n_at_end) if n_at_end > 0 else np.nan
        error_after = std_after / np.sqrt(n_after) if n_after > 0 else np.nan
        error = np.sqrt(error_at_end**2 + error_after**2)
        
        event_info.append({
            'end_idx': end_idx,
            'after_idx': after_idx,
            'end_time': time_array[end_idx],
            'mean_at_end': mean_at_end,
            'mean_after': mean_after,
            'difference': difference,
            'std_at_end': std_at_end,
            'std_after': std_after,
            'error': error,
            'data_at_end': data_at_end,
            'data_after': data_after
        })
    
    return event_info


def angle_window_post_event_gradient(time_array, angle_array, data_array,
                                     angle_min, angle_max,
                                     weeks_after,
                                     error_window=5):
    """
    Computes the average gradient from event end to X weeks after event end.

    Parameters:
    -----------
    time_array : numpy array
        Time series in Julian dates
    angle_array : numpy array
        Angle values corresponding to time_array
    data_array : numpy array
        Data values corresponding to time_array
    angle_min : float
        Minimum angle defining the event window
    angle_max : float
        Maximum angle defining the event window
    weeks_after : int
        Number of weeks after event end to compute gradient
    error_window : int, optional
        Number of points around each endpoint to use for error estimation (default=5)
        Set to 0 to disable error computation

    Returns:
    --------
    event_info : list of dicts
        Each dict contains:
        - end_idx: Index where event ends
        - end_time: Time when event ends
        - after_idx: Index X weeks after
        - after_time: Time X weeks after
        - gradient: Average gradient (change in data per day)
        - gradient_per_week: Gradient in units per week
        - gradient_error: Error in gradient (per day)
        - gradient_error_per_week: Error in gradient (per week)
        - data_at_end: Data value at event end
        - data_after: Data value X weeks after
        - std_at_end: Standard deviation at event end
        - std_after: Standard deviation X weeks after
        - delta_data: Change in data
        - delta_time: Time interval in days
    """
    # Find indices where angle is within the specified range
    in_window = (angle_array >= angle_min) & (angle_array <= angle_max)

    # Find contiguous windows
    windows = []
    start_idx = None

    for i in range(len(angle_array)):
        if in_window[i] and start_idx is None:
            start_idx = i
        elif not in_window[i] and start_idx is not None:
            windows.append((start_idx, i - 1))
            start_idx = None

    if start_idx is not None:
        windows.append((start_idx, len(angle_array) - 1))

    event_info = []

    for start, end in windows:
        end_idx = end

        # Find index X weeks after event end
        days_after = weeks_after * 7
        after_idx = end_idx + days_after

        if after_idx >= len(data_array):
            # Not enough data after event
            event_info.append({
                'end_idx': end_idx,
                'end_time': time_array[end_idx],
                'after_idx': np.nan,
                'after_time': np.nan,
                'gradient': np.nan,
                'gradient_per_week': np.nan,
                'gradient_error': np.nan,
                'gradient_error_per_week': np.nan,
                'data_at_end': data_array[end_idx],
                'data_after': np.nan,
                'std_at_end': np.nan,
                'std_after': np.nan,
                'delta_data': np.nan,
                'delta_time': np.nan
            })
            continue

        # Get data and time values
        data_at_end = data_array[end_idx]
        data_after = data_array[after_idx]
        time_at_end = time_array[end_idx]
        time_after = time_array[after_idx]

        # Compute local standard deviations for error estimation
        if error_window > 0:
            # Get window around end point
            half_window = error_window // 2
            idx_at_end = list(range(end_idx - half_window, end_idx + half_window + 1))
            idx_at_end = [i for i in idx_at_end if 0 <= i < len(data_array)]

            # Get window around after point
            idx_after = list(range(after_idx - half_window, after_idx + half_window + 1))
            idx_after = [i for i in idx_after if 0 <= i < len(data_array)]

            # Compute standard deviations
            data_window_end = data_array[idx_at_end]
            data_window_after = data_array[idx_after]

            std_at_end = np.nanstd(data_window_end)
            std_after = np.nanstd(data_window_after)

            # Standard errors
            n_at_end = len(data_window_end)
            n_after = len(data_window_after)
            se_at_end = std_at_end / np.sqrt(n_at_end) if n_at_end > 0 else np.nan
            se_after = std_after / np.sqrt(n_after) if n_after > 0 else np.nan
        else:
            std_at_end = np.nan
            std_after = np.nan
            se_at_end = np.nan
            se_after = np.nan

        # Compute gradient
        delta_data = data_after - data_at_end
        delta_time = time_after - time_at_end  # in days (Julian dates)

        gradient = delta_data / delta_time if delta_time != 0 else np.nan
        gradient_per_week = gradient * 7  # Convert to per week

        # Compute gradient error: σ_g = sqrt(σ_end^2 + σ_after^2) / Δt
        if error_window > 0 and delta_time != 0:
            gradient_error = np.sqrt(se_at_end**2 + se_after**2) / delta_time
            gradient_error_per_week = gradient_error * 7
        else:
            gradient_error = np.nan
            gradient_error_per_week = np.nan

        event_info.append({
            'end_idx': end_idx,
            'end_time': time_at_end,
            'after_idx': after_idx,
            'after_time': time_after,
            'gradient': gradient,
            'gradient_per_week': gradient_per_week,
            'gradient_error': gradient_error,
            'gradient_error_per_week': gradient_error_per_week,
            'data_at_end': data_at_end,
            'data_after': data_after,
            'std_at_end': std_at_end,
            'std_after': std_after,
            'delta_data': delta_data,
            'delta_time': delta_time
        })

    return event_info



def random_span_difference_null_distribution(data_array, 
                                                    span_weeks=4, 
                                                    n_local_points=1, 
                                                    n_trials=1000,
                                                    random_seed=None):
    """
    Generates a null distribution by computing differences over random time spans.
    
    This function randomly selects time periods of a specified length and computes
    the difference between locally-averaged values at the start and end of each span.
    This creates a null distribution for comparison with event-based differences.
    
    Parameters:
    -----------
    data_array : numpy array
        The data time series
    span_weeks : int
        Length of each random span in weeks (default=4)
    n_local_points : int
        Number of points to average at start and end of span (default=1)
        If 1, uses single point; if >1, uses centered window
    n_trials : int
        Number of random trials to perform (default=1000)
    random_seed : int, optional
        Seed for random number generator for reproducibility
        
    Returns:
    --------
    result : dict
        Dictionary containing:
        - differences: array of all computed differences
        - percent_positive: percentage of differences > 0
        - percent_negative: percentage of differences < 0
        - percent_zero: percentage of differences == 0
        - mean_difference: mean of all differences
        - std_difference: standard deviation of differences
        - median_difference: median of differences
    """
    if random_seed is not None:
        np.random.seed(random_seed)

    data_array = np.asarray(data_array)
    n_data = len(data_array)
    span_days = span_weeks * 7

    # Need enough data for span plus local averaging windows
    half_window = n_local_points // 2
    min_required = span_days + 2 * half_window

    if n_data < min_required:
        raise ValueError(f"Data array too short. Need at least {min_required} points.")

    differences = []

    for _ in range(n_trials):
        # Randomly select start of span
        # Ensure we have room for the span and local averaging windows
        max_start = n_data - span_days - half_window
        if max_start < half_window:
            raise ValueError("Not enough data for the specified span and averaging window.")

        start_idx = np.random.randint(half_window, max_start)
        end_idx = start_idx + span_days

        # Compute local average at start
        if n_local_points == 1:
            idx_at_start = [start_idx]
        else:
            idx_at_start = list(range(start_idx - half_window, start_idx + half_window + 1))
            idx_at_start = [i for i in idx_at_start if 0 <= i < n_data]

        data_at_start = data_array[idx_at_start]
        mean_at_start = np.nanmean(data_at_start)

        # Compute local average at end
        if n_local_points == 1:
            idx_at_end = [end_idx]
        else:
            idx_at_end = list(range(end_idx - half_window, end_idx + half_window + 1))
            idx_at_end = [i for i in idx_at_end if 0 <= i < n_data]

        data_at_end = data_array[idx_at_end]
        mean_at_end = np.nanmean(data_at_end)

        # Compute difference (end - start)
        difference = mean_at_end - mean_at_start
        differences.append(difference)

    differences = np.array(differences)

    # Compute statistics
    n_positive = np.sum(differences > 0)
    n_negative = np.sum(differences < 0)
    n_zero = np.sum(differences == 0)

    percent_positive = 100 * n_positive / n_trials
    percent_negative = 100 * n_negative / n_trials
    percent_zero = 100 * n_zero / n_trials

    return {
        'differences': differences,
        'percent_positive': percent_positive,
        'percent_negative': percent_negative,
        'percent_zero': percent_zero,
        'mean_difference': np.mean(differences),
        'std_difference': np.std(differences),
        'median_difference': np.median(differences),
        'n_trials': n_trials
    }



poisson_p = lambda exp, obs: 2 * min(poisson.cdf(exp, obs), poisson.sf(exp-1, obs))
binom_p = lambda exp, obs: scipy.stats.binomtest(exp,obs, p=0.5).pvalue

## Test Statistics

In [None]:
# Test angle_window_variance_ratio
print("Testing angle_window_variance_ratio function:")
print("=" * 60)

# Create synthetic test data
n_points = 100
test_time = np.arange(n_points)
test_angles = np.linspace(0, 180, n_points)  # angles from 0 to 180

# Create data with known variance pattern:
# Low variance (0-1) before index 40
# High variance (random) between indices 40-60 (the "window")
# Low variance (0-1) after index 60
np.random.seed(42)
test_data = np.ones(n_points) * 0.5  # baseline
test_data[:40] += np.random.normal(0, 0.1, 40)  # low variance pre-window
test_data[40:60] += np.random.normal(0, 2.0, 20)  # high variance during window
test_data[60:] += np.random.normal(0, 0.1, 40)  # low variance post-window

# Test the function: look for angles between 70-110 degrees
# This should capture roughly the high-variance region
result = angle_window_variance_ratio(
    test_time, 
    test_angles, 
    test_data,
    angle_min=70,
    angle_max=110,
    pre_steps=10,
    inverse=False
)

print(f"\nNumber of windows found: {len(result)}")
for i, window in enumerate(result):
    print(f"\nWindow {i+1}:")
    print(f"  Start index: {window['start_idx']}, End index: {window['end_idx']}")
    print(f"  Variance before: {window['var_before']:.4f}")
    print(f"  Variance during: {window['var_during']:.4f}")
    print(f"  Ratio (during/before): {window['ratio']:.4f}")
    if window['ratio'] > 1:
        print(f"  ✓ During-window variance is HIGHER (as expected for our test data)")
    else:
        print(f"  ✗ During-window variance is LOWER")

print("\n" + "=" * 60)
print("Test complete!")

In [None]:
# Test angle_window_variance_ratio_post
print("Testing angle_window_variance_ratio_post function:")
print("=" * 60)

# Create synthetic test data
n_points = 100
test_time = np.arange(n_points)
test_angles = np.linspace(0, 180, n_points)  # angles from 0 to 180

# Create data with known variance pattern:
# Low variance before index 40
# High variance between indices 40-60 (the "window")
# Low variance after index 60
np.random.seed(42)
test_data = np.ones(n_points) * 0.5  # baseline
test_data[:40] += np.random.normal(0, 0.1, 40)  # low variance pre-window
test_data[40:60] += np.random.normal(0, 2.0, 20)  # high variance during window
test_data[60:] += np.random.normal(0, 0.1, 40)  # low variance post-window

# Test the function: look for angles between 70-110 degrees
# This should capture roughly the high-variance region
result_post = angle_window_variance_ratio_post(
    test_time, 
    test_angles, 
    test_data,
    angle_min=70,
    angle_max=110,
    post_steps=10,
    inverse=False
)

print(f"\nNumber of windows found: {len(result_post)}")
for i, window in enumerate(result_post):
    print(f"\nWindow {i+1}:")
    print(f"  Start index: {window['start_idx']}, End index: {window['end_idx']}")
    print(f"  Variance during: {window['var_during']:.4f}")
    print(f"  Variance after: {window['var_after']:.4f}")
    print(f"  Ratio (during/after): {window['ratio']:.4f}")
    if window['ratio'] > 1:
        print(f"  ✓ During-window variance is HIGHER (as expected for our test data)")
    else:
        print(f"  ✗ During-window variance is LOWER")

print("\n" + "=" * 60)
print("Test complete!")

In [None]:
# Edge case tests for both functions
print("Testing edge cases and special scenarios:")
print("=" * 60)

# Test 1: Multiple angle windows
print("\n1. Testing with multiple 90-degree angle windows:")
n_points = 200
test_time = np.arange(n_points)
# Create angles that cross 90 degrees multiple times
test_angles = 90 + 10 * np.sin(2 * np.pi * test_time / 50)
test_data = np.random.normal(1.0, 0.5, n_points)

result_multi = angle_window_variance_ratio(
    test_time, test_angles, test_data,
    angle_min=88, angle_max=92, pre_steps=5, inverse=False
)
print(f"   Found {len(result_multi)} windows")

# Test 2: Zero variance case
print("\n2. Testing with constant (zero variance) data:")
test_data_const = np.ones(100)
result_zero = angle_window_variance_ratio(
    np.arange(100), np.linspace(0, 180, 100), test_data_const,
    angle_min=70, angle_max=110, pre_steps=5, inverse=False
)
if len(result_zero) > 0:
    print(f"   Ratio with zero variance: {result_zero[0]['ratio']}")
    print(f"   (Should be 0 when both variances are zero)")

# Test 3: Test inverse parameter
print("\n3. Testing inverse=True parameter:")
np.random.seed(123)
test_data_inv = np.random.normal(0, 1, 100)
result_normal = angle_window_variance_ratio(
    np.arange(100), np.linspace(0, 180, 100), test_data_inv,
    angle_min=70, angle_max=110, pre_steps=10, inverse=False
)
result_inverse = angle_window_variance_ratio(
    np.arange(100), np.linspace(0, 180, 100), test_data_inv,
    angle_min=70, angle_max=110, pre_steps=10, inverse=True
)
if len(result_normal) > 0 and len(result_inverse) > 0:
    ratio_normal = result_normal[0]['ratio']
    ratio_inverse = result_inverse[0]['ratio']
    print(f"   Normal ratio: {ratio_normal:.4f}")
    print(f"   Inverse ratio: {ratio_inverse:.4f}")
    print(f"   Product (should be ~1): {ratio_normal * ratio_inverse:.4f}")

# Test 4: Insufficient data before window
print("\n4. Testing edge case: window too close to start (insufficient pre-data):")
result_edge = angle_window_variance_ratio(
    np.arange(50), np.linspace(0, 180, 50), np.random.normal(0, 1, 50),
    angle_min=5, angle_max=15, pre_steps=20, inverse=False  # Need 20 points before, but window starts early
)
if len(result_edge) > 0 and np.isnan(result_edge[0]['ratio']):
    print(f"   ✓ Correctly returns NaN when insufficient pre-data")
else:
    print(f"   Result: {result_edge}")

# Test 5: Insufficient data after window (for post function)
print("\n5. Testing edge case: window too close to end (insufficient post-data):")
result_edge_post = angle_window_variance_ratio_post(
    np.arange(50), np.linspace(0, 180, 50), np.random.normal(0, 1, 50),
    angle_min=160, angle_max=175, post_steps=20, inverse=False  # Need 20 points after
)
if len(result_edge_post) > 0 and np.isnan(result_edge_post[0]['ratio']):
    print(f"   ✓ Correctly returns NaN when insufficient post-data")
else:
    print(f"   Result: {result_edge_post}")

print("\n" + "=" * 60)
print("Edge case tests complete!")

In [None]:
# Test angle_window_post_event_difference
print("Testing angle_window_post_event_difference function:")
print("=" * 70)

# Create synthetic test data
n_points = 200
test_time = np.arange(n_points)  # Simulating daily data (Julian dates)
test_angles = np.linspace(0, 180, n_points)

# Create data with a known pattern:
# - Baseline value around 0.5
# - At event end (around index 60), value = 0.3
# - 4 weeks later (28 days), value increases to 0.8
np.random.seed(42)
test_data = np.ones(n_points) * 0.5 + np.random.normal(0, 0.05, n_points)

# Create a step increase 4 weeks after event end
event_end_idx = 60
weeks_after = 4
days_after = weeks_after * 7  # 28 days

# Set values at event end to be low
test_data[event_end_idx-2:event_end_idx+2] = 0.3 + np.random.normal(0, 0.02, 4)

# Set values 4 weeks after to be high
after_idx = event_end_idx + days_after
test_data[after_idx-2:after_idx+2] = 0.8 + np.random.normal(0, 0.02, 4)

# Test with single point measurements
print("\n1. Testing with single point (n_points_at_end=1, n_points_after=1):")
result_single = angle_window_post_event_difference(
    test_time, test_angles, test_data,
    angle_min=60, angle_max=70,  # Event should end around index 60
    weeks_after=4,
    n_points_at_end=1,
    n_points_after=1
)

if len(result_single) > 0:
    event = result_single[0]
    print(f"   Event end index: {event['end_idx']}")
    print(f"   Data at end: {event['mean_at_end']:.4f}")
    print(f"   Data 4 weeks after: {event['mean_after']:.4f}")
    print(f"   Difference (after - at_end): {event['difference']:.4f}")
    print(f"   Expected: ~0.5 (since 0.8 - 0.3 = 0.5)")
    print(f"   Error: {event['error']:.4f}")

# Test with averaging over multiple points
print("\n2. Testing with averaging (n_points_at_end=5, n_points_after=5):")
result_avg = angle_window_post_event_difference(
    test_time, test_angles, test_data,
    angle_min=60, angle_max=70,
    weeks_after=4,
    n_points_at_end=5,
    n_points_after=5
)

if len(result_avg) > 0:
    event = result_avg[0]
    print(f"   Event end index: {event['end_idx']}")
    print(f"   Mean at end (5 points): {event['mean_at_end']:.4f}")
    print(f"   Std at end: {event['std_at_end']:.4f}")
    print(f"   Mean 4 weeks after (5 points): {event['mean_after']:.4f}")
    print(f"   Std after: {event['std_after']:.4f}")
    print(f"   Difference: {event['difference']:.4f}")
    print(f"   Error: {event['error']:.4f}")
    print(f"   Note: Averaging reduces noise and provides error estimate")

print("\n" + "=" * 70)
print("Test complete!")

In [None]:
# Test angle_window_post_event_gradient with error computation
print("Testing angle_window_post_event_gradient function with error:")
print("=" * 70)

# Create synthetic test data with a known linear trend after event
n_points = 200
test_time = np.arange(n_points)  # Simulating daily data
test_angles = np.linspace(0, 180, n_points)

# Create data with linear increase after event end + some noise
np.random.seed(123)
test_data = np.ones(n_points) * 0.5

# Event ends around index 60
event_end_idx = 60
weeks_after = 4
days_after = weeks_after * 7

# Create a linear trend: at end = 0.3, after 4 weeks = 0.8
# Gradient = (0.8 - 0.3) / 28 days = 0.0179 per day
test_data[event_end_idx] = 0.3
after_idx = event_end_idx + days_after
test_data[after_idx] = 0.8

# Fill in linear interpolation between these points with some noise
for i in range(event_end_idx + 1, after_idx):
    fraction = (i - event_end_idx) / days_after
    test_data[i] = 0.3 + fraction * (0.8 - 0.3) + np.random.normal(0, 0.02)

# Add noise around the endpoints
test_data[event_end_idx-2:event_end_idx+3] += np.random.normal(0, 0.03, 5)
test_data[after_idx-2:after_idx+3] += np.random.normal(0, 0.03, 5)

print("\n1. Testing gradient with error calculation (error_window=5):")
result_grad = angle_window_post_event_gradient(
    test_time, test_angles, test_data,
    angle_min=60, angle_max=70,
    weeks_after=4,
    error_window=5
)

if len(result_grad) > 0:
    event = result_grad[0]
    print(f"   Event end index: {event['end_idx']}")
    print(f"   Data at end: {event['data_at_end']:.4f} ± {event['std_at_end']:.4f}")
    print(f"   Data 4 weeks after: {event['data_after']:.4f} ± {event['std_after']:.4f}")
    print(f"   Delta data: {event['delta_data']:.4f}")
    print(f"   Delta time (days): {event['delta_time']:.1f}")
    print(f"   Gradient (per day): {event['gradient']:.6f} ± {event['gradient_error']:.6f}")
    print(f"   Gradient (per week): {event['gradient_per_week']:.4f} ± {event['gradient_error_per_week']:.4f}")
    print(f"   Expected gradient: ~0.017857 per day")

# Test without error computation
print("\n2. Testing gradient without error (error_window=0):")
result_no_error = angle_window_post_event_gradient(
    test_time, test_angles, test_data,
    angle_min=60, angle_max=70,
    weeks_after=4,
    error_window=0
)

if len(result_no_error) > 0:
    event = result_no_error[0]
    print(f"   Gradient (per day): {event['gradient']:.6f}")
    print(f"   Gradient error: {event['gradient_error']} (disabled)")

# Test with multiple events
print("\n3. Testing with multiple angle events:")
test_angles_multi = 90 + 10 * np.sin(2 * np.pi * test_time / 50)
test_data_multi = np.linspace(0, 1, n_points)  # Steady increase

result_multi = angle_window_post_event_gradient(
    test_time, test_angles_multi, test_data_multi,
    angle_min=88, angle_max=92,
    weeks_after=2,
    error_window=5
)

print(f"   Found {len(result_multi)} events")
if len(result_multi) > 0:
    for i, event in enumerate(result_multi[:3]):  # Show first 3 events
        if not np.isnan(event['gradient']):
            print(f"   Event {i+1}: gradient = {event['gradient']:.6f} ± {event['gradient_error']:.6f} per day")

print("\n" + "=" * 70)
print("Test complete!")

In [None]:
# Visual test: Plot to show what the functions are measuring
print("Visual demonstration of post-event statistics:")
print("=" * 70)

# Create synthetic data
n_points = 150
start = 2426342.5
test_time = start + np.arange(n_points)
test_angles = np.linspace(0, 180, n_points)

# Create data with a clear pattern after event end
np.random.seed(42)
test_data = 0.5 + np.random.normal(0, 0.05, n_points)

# Event ends around index 50
event_end = 50
# 4 weeks after = index 78 (28 days later)
weeks = 4
after_idx = event_end + weeks * 7

# Create lower values at event end
test_data[event_end-3:event_end+3] = 0.25 + np.random.normal(0, 0.02, 6)

# Create higher values 4 weeks after
test_data[after_idx-3:after_idx+3] = 0.75 + np.random.normal(0, 0.02, 6)

# Run the functions
diff_result = angle_window_post_event_difference(
    test_time, test_angles, test_data,
    angle_min=60, angle_max=70,
    weeks_after=4,
    n_points_at_end=10,
    n_points_after=10
)

grad_result = angle_window_post_event_gradient(
    test_time, test_angles, test_data,
    angle_min=60, angle_max=70,
    weeks_after=4
)

# Plot
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))

# Top panel: Show the angle and event window
ax1.plot(test_time-start, test_angles, 'b-', linewidth=1.5, label='Angle')
ax1.axhline(y=60, color='r', linestyle='--', alpha=0.5, label='Event boundaries')
ax1.axhline(y=70, color='r', linestyle='--', alpha=0.5)
if len(diff_result) > 0:
    event_end_idx = diff_result[0]['end_idx']
    ax1.axvline(x=event_end_idx, color='g', linestyle='--', linewidth=2, label='Event end')
ax1.set_ylabel('Angle (degrees)')
ax1.set_title('Angle Time Series (Events are when angle is between 60-70 degrees)')
ax1.legend()
ax1.grid(True, alpha=0.3)

# Bottom panel: Show the data and measurements
ax2.plot(test_time-start, test_data, 'k-', linewidth=1, alpha=0.6, label='Data')
if len(diff_result) > 0:
    event = diff_result[0]
    end_idx = event['end_idx']
    
    # Mark event end
    ax2.axvline(x=end_idx, color='g', linestyle='--', linewidth=2, label='Event end')
    
    # Mark 4 weeks after
    ax2.axvline(x=end_idx + 28, color='purple', linestyle='--', linewidth=2, 
                label=f'{weeks} weeks after')
    
    # Show the mean values
    ax2.plot(end_idx, event['mean_at_end'], 'go', markersize=12, 
            label=f"Mean at end = {event['mean_at_end']:.3f}")
    ax2.plot(end_idx + 28, event['mean_after'], 'mo', markersize=12,
            label=f"Mean after = {event['mean_after']:.3f}")
    
    # Draw arrow showing difference
    ax2.annotate('', xy=(end_idx + 35, event['mean_after']), 
                xytext=(end_idx + 35, event['mean_at_end']),
                arrowprops=dict(arrowstyle='<->', color='red', lw=2))
    ax2.text(end_idx + 38, (event['mean_at_end'] + event['mean_after'])/2,
            f"Δ = {event['difference']:.3f}\n± {event['error']:.3f}",
            fontsize=10, color='red')

ax2.set_xlabel('Time (days)')
ax2.set_ylabel('Data value')
ax2.set_title('Data Time Series (showing post-event difference measurement)')
ax2.legend(loc='upper left')
ax2.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

if len(diff_result) > 0 and len(grad_result) > 0:
    print(f"\nResults summary:")
    print(f"  Difference method: Δ = {diff_result[0]['difference']:.4f} ± {diff_result[0]['error']:.4f}")
    print(f"  Gradient method: {grad_result[0]['gradient']:.6f} per day")
    print(f"                   {grad_result[0]['gradient_per_week']:.4f} per week")

print("\n" + "=" * 70)

In [None]:
# Plot post-event gradients with error bars
# Assuming you have stored the gradient result in a variable (e.g., 'grad_result')
# grad_result = angle_window_post_event_gradient(...)

# Extract data from the result
gradients = [event['gradient_per_week'] for event in grad_result]
gradient_errors = [event['gradient_error_per_week'] for event in grad_result]
event_times_jd = [start+event['end_time'] for event in grad_result]

# Convert Julian dates to datetime
event_dates = Time(event_times_jd, format='jd').to_datetime()

# Create the plot
fig, ax = plt.subplots(figsize=(14, 6))

# Plot with error bars
ax.errorbar(event_dates, gradients, yerr=gradient_errors, 
            fmt='s', markersize=8, capsize=5, capthick=2,
            color='darkgreen', ecolor='lightgreen', elinewidth=2,
            label='Gradient ± Error')

# Add horizontal line at zero for reference
ax.axhline(y=0, color='red', linestyle='--', linewidth=1.5, alpha=0.7, label='Zero gradient')

# Formatting
ax.set_xlabel('Event End Date', fontsize=12)
ax.set_ylabel('Gradient (per week)', fontsize=12)
ax.set_title('Post-Event Gradient with Error Bars', fontsize=14)
ax.grid(True, alpha=0.3)
ax.legend(fontsize=10)

# Format x-axis to show dates nicely
import matplotlib.dates as mdates
ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
ax.xaxis.set_major_locator(mdates.AutoDateLocator())
plt.xticks(rotation=45, ha='right')

plt.tight_layout()
plt.show()

# Print summary statistics
print(f"Summary of {len(gradients)} events:")
print(f"  Mean gradient: {np.nanmean(gradients):.6f} ± {np.nanstd(gradients):.6f} per week")
print(f"  Median gradient: {np.nanmedian(gradients):.6f} per week")
print(f"  Mean error: {np.nanmean(gradient_errors):.6f} per week")
print(f"  Number of positive gradients: {sum(1 for g in gradients if g > 0)}")
print(f"  Number of negative gradients: {sum(1 for g in gradients if g < 0)}")

In [None]:
# Plot post-event differences with error bars
# Assuming you have stored the result in a variable (e.g., 'diff_result')
# diff_result = angle_window_post_event_difference(...)

# Extract data from the result
differences = [event['difference'] for event in diff_result]
errors = [event['error'] for event in diff_result]
event_times_jd = [event['end_time'] for event in diff_result]

# Convert Julian dates to datetime
event_dates = Time(event_times_jd, format='jd').to_datetime()

# Create the plot
fig, ax = plt.subplots(figsize=(14, 6))

# Plot with error bars
ax.errorbar(event_dates, differences, yerr=errors, 
            fmt='o', markersize=8, capsize=5, capthick=2,
            color='steelblue', ecolor='lightsteelblue', elinewidth=2,
            label='Difference ± Error')

# Add horizontal line at zero for reference
ax.axhline(y=0, color='red', linestyle='--', linewidth=1.5, alpha=0.7, label='Zero line')

# Formatting
ax.set_xlabel('Event End Date', fontsize=12)
ax.set_ylabel('Difference (After - At End)', fontsize=12)
ax.set_title('Post-Event Difference in Data with Error Bars', fontsize=14)
ax.grid(True, alpha=0.3)
ax.legend(fontsize=10)

# Format x-axis to show dates nicely
import matplotlib.dates as mdates
ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
ax.xaxis.set_major_locator(mdates.AutoDateLocator())
plt.xticks(rotation=45, ha='right')

plt.tight_layout()
plt.show()

# Print summary statistics
print(f"Summary of {len(differences)} events:")
print(f"  Mean difference: {np.nanmean(differences):.4f} ± {np.nanstd(differences):.4f}")
print(f"  Median difference: {np.nanmedian(differences):.4f}")
print(f"  Mean error: {np.nanmean(errors):.4f}")
print(f"  Number of positive differences: {sum(1 for d in differences if d > 0)}")
print(f"  Number of negative differences: {sum(1 for d in differences if d < 0)}")

## Load HDF5 file

In [3]:
file = h5py.File(filePath)


## View relevant keys

In [4]:
planets = list(file.keys())
print(planets)
print(file['Earth'].keys())
print(file['Earth'].attrs.keys())


['Earth', 'Jupiter', 'Mercury', 'Neptune', 'Saturn', 'Sun', 'Uranus', 'Venus']
<KeysViewHDF5 ['jd', 'vx', 'vy', 'vz', 'x', 'y', 'z']>
<KeysViewHDF5 ['n_points', 'naif_id', 'units_position', 'units_velocity']>


## Ensure date ranges of planetary ephemerides match

In [5]:
dataConsistent = True
timearray = None
for planet in planets:
    if np.all(file[planet]['jd'][:] != file['Earth']['jd'][:]):
        print(f"Error as {planet}")
        dataConsistent = False
if dataConsistent:
    timearray = file['Earth']['jd'][:]


## Construct dictionary of 3D position values

In [6]:
ephemerides = {k:0 for k in planets}
for planet in planets:
    px = file[planet]['x'][:]*unit.AU.to(unit.m) # Meters
    py = file[planet]['y'][:]*unit.AU.to(unit.m) # Meters
    pz = file[planet]['z'][:]*unit.AU.to(unit.m) # Meters

    coords = np.stack([px,py,pz], axis=1)

    ephemerides[planet] = coords


## Construct mass array corresponding to Planets list

In [7]:
planet_masses = {
    'Mercury': 3.3011e23, # kg
    'Venus':4.8675e24, # kg
    'Earth': 5.9724e24,# kg
    #'Mars': 6.4171e23,# kg
    'Jupiter':1.898e27,# kg
    'Saturn':5.685e26,# kg
    'Uranus':8.682e25,# kg
    'Neptune':1.024e26# kg
    }


## Plot representative of the data

In [None]:
max_index = 400

x,y = zip(*ephemerides["Earth"][:max_index,0:2])

fig, ax = plt.subplots(figsize=(6, 4))
ax.set_xlim(min(x) - .2e11, max(x) + .2e11)
ax.set_ylim(min(y) - .2e11, max(y) + .2e11)
ax.set_xlabel("x")
ax.set_ylabel("y")
ax.set_title("Earth Trajectory")
ax.grid(True)
plt.plot(x,y, 'go', markersize=2)


# Barycenter Analysis

## Compute Barycenter for each planet group

In [8]:
group1 = ['Mercury', 'Venus', 'Earth', 'Jupiter', 'Saturn']
#group1 = ['Mercury', 'Venus', 'Earth']

group2 = ['Jupiter', 'Saturn', 'Uranus', 'Neptune']


# Group1 total mass
g1_masses = np.asarray([planet_masses[p] for p in group1])

# Group2 total mass
g2_masses = np.asarray([planet_masses[p] for p in group2])

# Compute weights for position vectors
#g1_weights = g1_masses / (solar_mass + np.sum(g1_masses))
#g2_weights = g2_masses / (solar_mass + np.sum(g2_masses))
g1_weights = np.array([.14,.34,.15,.35,.02])
g2_weights = np.array([.35,.40,.125,.125])

print(f" Group1 total mass: {np.sum(g1_masses)}   Group2 total mass: {np.sum(g2_masses)}")

# Compute total number of time series points
n_points = len(timearray)

# Compute the group1 barycenter
g1_ephemerides = np.asarray([ephemerides[p] for p in group1])
g1_barycenter = np.sum(g1_weights[:,None,None] * g1_ephemerides, axis=0)

# Compute the group2 barycenter
g2_ephemerides = np.asarray([ephemerides[p] for p in group2])
g2_barycenter = np.sum(g2_weights[:,None,None] * g2_ephemerides, axis=0)


 Group1 total mass: 2.47767001e+27   Group2 total mass: 2.65572e+27


## Compute angle between barycenters over the time series

In [9]:
# Inner products
dot_prods = np.einsum('ij,ij->i', g1_barycenter, g2_barycenter)
norms = np.linalg.norm(g1_barycenter, axis=1) * np.linalg.norm(g2_barycenter, axis=1)

# Compute the cosine of angles
cos_angles = dot_prods / norms
angles = np.arccos(cos_angles) * 180 / np.pi


## Compute plot of angles

In [None]:
import matplotlib.dates as mdates

tarr = Time(timearray, format='jd').to_datetime()

plt.figure(figsize=(10, 5))
plt.plot(tarr, angles, lw=1.5)
plt.axhline(y=90, color='r', linestyle='--', linewidth=1.5, label='90 Degrees')
plt.xlabel("Date")
plt.ylabel("Angle (degrees)")
plt.title("New Weighted Angles Over Time")
plt.grid(True, alpha=0.3)
plt.legend()

# Format date axis nicely
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y'))
plt.gca().xaxis.set_major_locator(plt.MaxNLocator(10))

plt.tight_layout()
#plt.savefig("../figures/barycenter_angles.png")
#plt.savefig('../literature/our_work/barycenter_angles.png')
plt.savefig("../figures/weighted_angles.png")
plt.savefig('../literature/our_work/weighted_angles.png')


# Tidal Analysis

## Tidal field study

### Group 1

In [None]:
# Compute the tidal tensor for just Group 1 planetary bodies

r_norm = np.linalg.norm(g1_ephemerides, axis=-1, keepdims=True)
r_hat = g1_ephemerides / r_norm

# outer product
rhat_outer = r_hat[...,:,None] * r_hat[...,None,:] # ( n_planets, n_time_points, 3, 3)

# Identity
I = np.eye(3)[None, None, :,:]

# per-body tidal tensor:
inv_r3 = (1.0 / r_norm**3)[...,None]
m_b = g1_masses[:,None,  None,None]

T_k = cons.G.value * m_b * (3.0 * rhat_outer - I) * inv_r3

g1_T = T_k.sum(axis=0)
g1_T_mag = np.sqrt((g1_T**2).sum(axis=(-2,-1)))


### Group 2

In [None]:
# Compute the tidal tensor for just Group 2 planetary bodies

r_norm = np.linalg.norm(g2_ephemerides, axis=-1, keepdims=True)
r_hat = g2_ephemerides / r_norm

# outer product
rhat_outer = r_hat[...,:,None] * r_hat[...,None,:] # ( n_planets, n_time_points, 3, 3)

# Identity
I = np.eye(3)[None, None, :,:]

# per-body tidal tensor:
inv_r3 = (1.0 / r_norm**3)[...,None]
m_b = g2_masses[:,None,  None,None]

T_k = cons.G.value * m_b * (3.0 * rhat_outer - I) * inv_r3

g2_T = T_k.sum(axis=0)
g2_T_mag = np.sqrt((g2_T**2).sum(axis=(-2,-1)))


### Total solar system

In [None]:
total_eph = np.asarray([ephemerides[p] for p in planet_masses.keys()])
planet_mass = np.asarray([planet_masses[p] for p in planet_masses.keys()])
r_norm = np.linalg.norm(total_eph, axis=-1, keepdims=True)
r_hat = total_eph / r_norm

# outer product
rhat_outer = r_hat[...,:,None] * r_hat[...,None,:] # ( n_planets, n_time_points, 3, 3)

# Identity
I = np.eye(3)[None, None, :,:]

# per-body tidal tensor:
inv_r3 = (1.0 / r_norm**3)[...,None]
m_b = planet_mass[:,None,  None,None]

T_k = cons.G.value * m_b * (3.0 * rhat_outer - I) * inv_r3

ss_T = T_k.sum(axis=0)
ss_T_mag = np.sqrt((ss_T**2).sum(axis=(-2,-1)))


In [None]:
fig, ax = plt.subplots(figsize=(10,5))
tarr = Time(timearray, format='jd').to_datetime()

ax.plot(tarr, ss_T_mag, color='dodgerblue', linewidth=2)
ax.set_xlabel("Date", fontsize=12)
ax.set_ylabel(r"Tidal tensor magnitude  $|T|$  (s$^{-2}$)", fontsize=12)
ax.set_title("Solar-System Tidal Tensor Magnitude at the Sun’s Center", fontsize=13)

# Optional: format the time axis nicely
ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y"))
plt.gca().xaxis.set_major_locator(plt.MaxNLocator(10))
fig.autofmt_xdate()

ax.grid(True, which='both', alpha=0.3)
plt.tight_layout()
plt.savefig("../figures/full_tidal_tensor_mag.png")
plt.savefig('../literature/our_work/full_tidal_tensor_mag.png')


## Compare angles to tidal tensor magnitude at sun location

In [None]:
circ_lin_corr(angles, ss_T_mag)


In [None]:
import scipy.signal as signal
import matplotlib.pyplot as plt

x = ss_T_mag
y = np.cos(angles)

# Remove mean / linear trend to avoid huge low-freq leakage
x = signal.detrend(x - np.mean(x))
y = signal.detrend(y - np.mean(y))

fs = 1.0 / np.median(np.diff(timearray).astype('timedelta64[D]').astype(float))  # samples/day
fs = fs * 365.25
print(f"Sampling rate ≈ {fs:.3f} samples/yr")

f, Cxy = signal.coherence(x, y, fs=fs,
                          nperseg=len(x)//8,
                          noverlap=len(x)//16,
                          window='hann')

plt.figure(figsize=(10,5))
plt.semilogx(f, Cxy, color='darkorange', lw=2)
plt.xlabel("Frequency (1/yr)")
plt.ylabel("Coherence  $C_{xy}(f)$")
plt.title("Magnitude - Angle Coherence Spectrum")
plt.grid(True, which='both', alpha=0.3)

# Annotate known orbital periods
for P,label in [(11.86,"Jupiter"), (19.86,"J-S synodic")]:
    plt.axvline(1/P, color='gray', ls='--')
    plt.text(1/P, 0.46, label, rotation=90, va='center', ha='right')
plt.show()


In [None]:

dt = np.median(np.diff(timearray).astype('timedelta64[D]').astype(float)) / 365.25  # years/sample
WCT, aWCT, coi, freq, sig = wavelet.wct(x, y, dt, dj=0.125, s0=2*dt, J=int(9/0.125), sig=None)
plt.figure(figsize=(10,5))
T, F = np.meshgrid(timearray, 1/freq)
plt.contourf(T, F, WCT, levels=np.linspace(0,1,50), cmap='viridis')
plt.yscale('log')
plt.ylabel('Period (years)')
plt.xlabel('Date')
plt.title('Wavelet Coherence |T| vs. cos(θ)')
plt.colorbar(label='Coherence')
plt.show()


In [None]:
WCT, aWCT, coi, freq, sig = wavelet.wct(
    x, y,
    dt,
    dj=0.125,
    s0=2*dt,
    J=int(9/0.125),
    significance_level=0.95,   # ask pycwt to compute significance
    wavelet='morlet',
    normalize=True
)


In [None]:
plt.figure(figsize=(10, 5))
T, F = np.meshgrid(timearray, 1/freq)

# main coherence field
plt.contourf(T, F, WCT, levels=np.linspace(0, 1, 50), cmap='viridis')
plt.yscale('log')
plt.ylabel('Period (years)')
plt.xlabel('Date')
plt.title('Wavelet Coherence |T| vs. cos(θ)')
plt.colorbar(label='Coherence')

sig2d = np.tile(sig[:, None], (1, WCT.shape[1]))
plt.contour(T, F, sig2d, levels=[0.95], colors='white', linewidths=1.2)

print(f" Any significant peridoicities? {np.any(sig2d>0.95)}")


# Sunspot Analysis

## Load Data

In [10]:

# Path to sunspots data file
__SUNSPOT_FILE__ = Path("../data/SN_d_tot_V2.0.csv")

# Load the csv file into memory
sp_file =pd.read_csv(__SUNSPOT_FILE__, sep = None, na_values=['-1', 'NaN'], engine='python')

# Compute the normalized number of sunspots, ranging from 0 to 1
minv = sp_file['N_Sunspots'].min()
maxv = sp_file['N_Sunspots'].max()
mean = sp_file['N_Sunspots'].mean()
std = sp_file['N_Sunspots'].std()
sp_file['N_Sunspots_Norm'] = (sp_file['N_Sunspots'] - minv) / (maxv - minv)

# Compute the solar tranquility, defined by 1 - N_Sunspots_Norm
sp_file["Tranquility"] = (1 - sp_file["N_Sunspots_Norm"])

# Create new column converting Year-Month-Day data into julian date
sp_file['jd'] = Time(
    [f"{y}-{m}-{d}" for y, m, d in zip(sp_file['Year'], sp_file['Month'], sp_file['Day'])],
    format='iso'
).jd

# Print summary of datatable
sp_file


Unnamed: 0,Year,Month,Day,Decimal_Year,N_Sunspots,Std,N_Observations,Def/Prov,N_Sunspots_Norm,Tranquility,jd
0,1818,1,1,1818.001,,,0,1,,,2385070.5
1,1818,1,2,1818.004,,,0,1,,,2385071.5
2,1818,1,3,1818.007,,,0,1,,,2385072.5
3,1818,1,4,1818.010,,,0,1,,,2385073.5
4,1818,1,5,1818.012,,,0,1,,,2385074.5
...,...,...,...,...,...,...,...,...,...,...,...
75905,2025,10,27,2025.821,98.0,17.2,31,0,0.185606,0.814394,2460975.5
75906,2025,10,28,2025.823,114.0,13.6,30,0,0.215909,0.784091,2460976.5
75907,2025,10,29,2025.826,100.0,13.3,23,0,0.189394,0.810606,2460977.5
75908,2025,10,30,2025.829,70.0,15.7,25,0,0.132576,0.867424,2460978.5


In [11]:
# Grab only the dates that correspond to our selected epochs

# Generate mask for each set of dates
timearray_mask = np.isin(timearray, sp_file['jd'].values)
sp_mask = np.isin(sp_file['jd'].values, timearray)

# Slice the dataframe according to the timearray dates
sliced_df = sp_file[sp_file['jd'].isin(timearray)]
sliced_df

print(sliced_df.shape[0], timearray.shape)


34638 (34698,)


In [12]:
## Slice the angles according to the date range of the sunspots
sliced_angles = angles[timearray_mask]

## Extract sunspots for from sliced dateframe
sunspots = sliced_df['N_Sunspots_Norm'].values

## Ensure dates are aligned
print(f"Dates are aligned: {np.all(sliced_df['jd'] == timearray[timearray_mask])}")

## Ensure data lists are identical lengths
print(f"Data is consistent lengths: {len(sliced_angles) == len(sunspots)}")


Dates are aligned: True
Data is consistent lengths: True


## Compute Statistics

In [None]:
## Compute correlation and p-score
circ_lin_corr(sliced_angles, sunspots)


In [None]:
lags = np.arange(-120,120,10)
out = sweep_lag_circ_lin(timearray[timearray_mask], sliced_angles, sliced_df['jd'], sunspots, lags, L=30, n_perm=2000)
out


In [None]:
res = wavelet_cross_analysis(sliced_angles, sunspots, timearray[1] - timearray[0])
print(f"Mean coherence: {np.nanmean(res['wtc']):.3f}")
print(f"Mean delay: {np.nanmean(res['delay']):.2f} time units")
res.keys()


In [None]:
import numpy as np
import matplotlib.pyplot as plt

# --- align array lengths ---
time = np.asarray(timearray[timearray_mask], float)
period = np.asarray(res["period"], float)
wtc = np.asarray(res["wtc"])
angles_deg = sliced_angles

n = min(len(time), wtc.shape[1], len(angles_deg))
time, wtc, angles_deg = time[:n], wtc[:, :n], angles_deg[:n]

# --- build figure with tight, touching layout ---
fig = plt.figure(figsize=(10, 7))
gs = fig.add_gridspec(2, 1, height_ratios=[3, 1], hspace=0.05)
ax_top = fig.add_subplot(gs[0])
ax_bottom = fig.add_subplot(gs[1], sharex=ax_top)

# --- top: wavelet coherence ---
T, P = np.meshgrid(time, period)
pcm = ax_top.pcolormesh(T, P, wtc, shading="auto", cmap="turbo")
ax_top.set_yscale("log")
ax_top.set_ylabel("Period")
ax_top.set_title("Wavelet Coherence")
fig.colorbar(pcm, ax=ax_top, orientation="vertical", label="Coherence")

# hide top plot’s x ticks and labels
ax_top.tick_params(labelbottom=False)
ax_top.set_xlabel("")  # no label on top plot

# --- bottom: driver angle vs. time ---
ax_bottom.plot(time, angles_deg, lw=0.8, color="tab:blue")
ax_bottom.set_ylabel("Angle (°)")
ax_bottom.set_xlabel("Time (Julian Date)")
ax_bottom.set_xlim(time[0], time[-1])
#ax_bottom.set_title("Driver Angle vs. Time")

# --- ensure exact horizontal alignment ---
pos0 = ax_top.get_position()
pos1 = ax_bottom.get_position()
ax_bottom.set_position([pos0.x0, pos1.y0, pos0.width, pos1.height])


plt.savefig("../figures/wavelet_coh.png")
plt.savefig('../literature/our_work/wavelet_coh.png')


In [None]:
phase_raw = np.angle(res['Wxy'])
import numpy as np
import matplotlib.pyplot as plt

# --- align array lengths ---
time = np.asarray(timearray[timearray_mask], float)
period = np.asarray(res["period"], float)
wtc = phase_raw
angles_deg = sliced_angles

n = min(len(time), wtc.shape[1], len(angles_deg))
time, wtc, angles_deg = time[:n], wtc[:, :n], angles_deg[:n]

# --- build figure with tight, touching layout ---
fig = plt.figure(figsize=(10, 7))
gs = fig.add_gridspec(2, 1, height_ratios=[3, 1], hspace=0.05)
ax_top = fig.add_subplot(gs[0])
ax_bottom = fig.add_subplot(gs[1], sharex=ax_top)

# --- top: wavelet coherence ---
T, P = np.meshgrid(time, period)
pcm = ax_top.pcolormesh(T, P, wtc, shading="auto", cmap="turbo")
ax_top.set_yscale("log")
ax_top.set_ylabel("Period")
ax_top.set_title("Relative Phase")
fig.colorbar(pcm, ax=ax_top, orientation="vertical", label="Coherence")

# hide top plot’s x ticks and labels
ax_top.tick_params(labelbottom=False)
ax_top.set_xlabel("")  # no label on top plot

# --- bottom: driver angle vs. time ---
ax_bottom.plot(time, angles_deg, lw=0.8, color="tab:blue")
ax_bottom.set_ylabel("Angle (°)")
ax_bottom.set_xlabel("Time (Julian Date)")
ax_bottom.set_xlim(time[0], time[-1])
#ax_bottom.set_title("Driver Angle vs. Time")

# --- ensure exact horizontal alignment ---
pos0 = ax_top.get_position()
pos1 = ax_bottom.get_position()
ax_bottom.set_position([pos0.x0, pos1.y0, pos0.width, pos1.height])

plt.savefig("../figures/wavelet_coh_phase.png")
plt.savefig('../literature/our_work/wavelet_coh_phase.png')


In [None]:
tarr = Time(timearray[timearray_mask], format='jd').to_datetime()

plt.figure(figsize=(10, 5))
plt.plot(tarr, sunspots, lw=1.5)
plt.xlabel("Date")
plt.ylabel("Tranquility")
plt.title("Temporal Evolution of Solar Tranquility")
plt.grid(True, alpha=0.3)
plt.legend()

# Format date axis nicely
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y'))
plt.gca().xaxis.set_major_locator(plt.MaxNLocator(10))

plt.tight_layout()
plt.savefig("../figures/soltranq.png")
plt.savefig('../literature/our_work/soltranq.png')


# Comparison of Coordinate Data

In [None]:
merctime =file["Mercury"]['jd'][:]
mercx = file['Mercury']['x'][:]
mercz = file['Mercury']['z'][:]
tarr = Time(merctime, format='jd').to_datetime()

# The below values agree with the Horizons website API query
print(tarr[1464], mercx[1464])
print(tarr[1464], mercz[1464])
print(tarr[1465], mercx[1465])
print(tarr[1465], mercz[1465])


# 90 Degree Events

## Study A

### Figure 1 Recreation

In [None]:
# First, recreate Fig. 1

tranquility = sp_file['Tranquility']

time_lower = Time('2015-06-05', format='iso').jd
time_higher = Time('2019-12-05', format='iso').jd

# Generate the time mask
MASK = ( timearray >= time_lower ) & ( timearray <= time_higher)

# Slice the time array
timearray = file['Earth']['jd'][:]
timearray_sliced = timearray[ MASK ]

# Slice the angle array
angles_sliced = angles[ MASK ]

# Create a boolean array to show where 90 degree events occur
events_bool = ( angles_sliced >= __ANGLE_LIMIT_LOWER__ ) & ( angles_sliced <= __ANGLE_LIMIT_HIGHER__)

# SLice the tranquility array to the times as well
sp_mask = ( sp_file['jd'] >= time_lower ) & ( sp_file['jd'] <= time_higher )
tranq_masked = tranquility[sp_mask]

In [None]:
times = Time(timearray_sliced, format='jd')
iso_dates = times.iso

# Create the plot
fig, ax = plt.subplots(figsize=(12, 6))
ax.plot(timearray_sliced, 0.8 * events_bool, 'b-', linewidth=1.5, label='90 degree events') # Rescaled for nice figure
ax.plot(timearray_sliced, tranq_masked, 'r-', linewidth=1.5, label='solar tranquility')

# Set only 10 x-ticks with ISO format labels
tick_indices = np.linspace(0, len(timearray_sliced)-1, 10, dtype=int)
ax.set_xticks(timearray_sliced[tick_indices])
ax.set_xticklabels([iso_dates[i][:10] for i in tick_indices], rotation=45, ha='right')

ax.set_xlabel('Date (ISO)')
ax.get_yaxis().set_visible(False)  # Turn off y-axis
ax.set_title('Solar Tranquility versus 90 degree events')
ax.legend()
ax.grid(True, alpha=0.3)

plt.savefig("../figures/sol_tranq_and_90_degree_select.png")
plt.savefig('../literature/our_work/sol_tranq_and_90_degree_select.png')


plt.tight_layout()
plt.show()

### Figure 2 Recreation

In [None]:

tranquility = sp_file['Tranquility']

time_lower = Time('1935-01-11', format='iso').jd
time_higher = Time('2022-01-11', format='iso').jd

# Generate the time mask
MASK = ( timearray >= time_lower ) & ( timearray <= time_higher)

# Slice the time array
timearray = file['Earth']['jd'][:]
timearray_sliced = timearray[ MASK ]

# Slice the angle array
angles_sliced = angles[ MASK ]

# Create a boolean array to show where 90 degree events occur
events_bool = ( angles_sliced >= __ANGLE_LIMIT_LOWER__ ) & ( angles_sliced <= __ANGLE_LIMIT_HIGHER__)

# SLice the tranquility array to the times as well
sp_mask = ( sp_file['jd'] >= time_lower ) & ( sp_file['jd'] <= time_higher )
tranq_masked = tranquility[sp_mask]

In [None]:
times = Time(timearray_sliced, format='jd')
iso_dates = times.iso

# Create the plot
fig, ax = plt.subplots(figsize=(12, 6))
ax.plot(timearray_sliced, 2 * events_bool, 'b-', linewidth=1.5, label='90 degree events') # Rescaled for nice figure
ax.plot(timearray_sliced, tranq_masked, 'r-', linewidth=1.5, label='solar tranquility', alpha=0.8)

# Set only 10 x-ticks with ISO format labels
tick_indices = np.linspace(0, len(timearray_sliced)-1, 10, dtype=int)
ax.set_xticks(timearray_sliced[tick_indices])
ax.set_xticklabels([iso_dates[i][:10] for i in tick_indices], rotation=45, ha='right')

ax.set_xlabel('Date (ISO)')
ax.get_yaxis().set_visible(False)  # Turn off y-axis
ax.set_title('Solar Tranquility versus 90 degree events')
ax.legend()
ax.grid(True, alpha=0.3)
plt.savefig("../figures/sol_tranq_and_90_degree.png")
plt.savefig('../literature/our_work/sol_tranq_and_90_degree.png')

plt.tight_layout()
plt.show()


### Stability of Solar Tranquility Figures of Merit

In [15]:
tranquility = sp_file['Tranquility']

print(f"Using 7-day rolling average of Tranquility")
tranquility = tranquility.rolling(window=7).mean()


sp_file_time = sp_file['jd'].values
ps_file_time = file['Earth']['jd'][:]

# Use entire aligned date range
time_lower = np.max([sp_file_time[0], ps_file_time[0]]) #Time('2015-06-05', format='iso').jd
time_higher = np.min([sp_file_time[-1], ps_file_time[-1]]) # Time('2019-12-05', format='iso').jd

# Generate the time mask
MASK = ( timearray >= time_lower ) & ( timearray <= time_higher)
SP_MASK = ( sp_file['jd'] >= time_lower ) & ( sp_file['jd'] <= time_higher )

# Slice the time array

timearray_sliced = timearray[ MASK ]
sp_time_sliced = sp_file_time[ SP_MASK ]

# Ensure time arrays are aligned
print(f"Time arrays are aligned: {np.all(timearray_sliced == sp_time_sliced)}")

# Slice the angle array
angles_sliced = angles[ MASK ]

# Create a boolean array to show where 90 degree events occur
events_bool = ( angles_sliced >= __ANGLE_LIMIT_LOWER__ ) & ( angles_sliced <= __ANGLE_LIMIT_HIGHER__)

# SLice the tranquility array to the times as well
tranq_masked = tranquility[SP_MASK]

Using 7-day rolling average of Tranquility
Time arrays are aligned: True


In [16]:
# Compute complete running variance ratio of solar tranquility

tranq_var_rat_pre = angle_window_variance_ratio(
    timearray_sliced,
    angles_sliced,
    tranq_masked,
    86,
    94,
    12*7,
    inverse = False
)

# Drop NaN values (occurs usually when there is no sunspot data for that date)
tranq_var_rat_pre_filt = [win for win in tranq_var_rat_pre if not np.isnan(win['ratio']) ]
rats_pre = [win['ratio'] for win in tranq_var_rat_pre]
print(f"Number of events: {len(tranq_var_rat_pre_filt)}")
print(f"Ratios: {rats_pre}")
print(f"Mean: {np.nanmean(rats_pre)}")

Number of events: 26
Ratios: [np.float64(0.8254108436214738), np.float64(2.0745294113695785), np.float64(0.8156184951322891), np.float64(0.2325840194880084), np.float64(0.13921575702875308), np.float64(0.10735958008315599), np.float64(0.2839853202648998), np.float64(0.6678476402844529), np.float64(1.0246720616179017), np.float64(0.7233846977208683), np.float64(3.9742857206368365), np.float64(0.5177228679056024), np.float64(0.8335416826869785), np.float64(0.03658280366665759), np.float64(0.371707108282157), np.float64(0.22645249253470257), np.float64(0.9288803623406717), np.float64(0.7221644497270191), np.float64(0.4182991565650158), np.float64(1.754362369560569), np.float64(0.3340432263776896), np.float64(0.6122539603848873), np.float64(0.6024761565610376), np.float64(0.5980541738910763), np.float64(0.058560533187764156), np.float64(0.6568202475867952)]
Mean: 0.751569813019494


In [31]:
starts = [win['start_idx'] for win in tranq_var_rat_pre]
ends = [win['end_idx'] for win in tranq_var_rat_pre]
mindiff = np.inf
for st in starts:
    for en in ends:
        if np.abs(en-st)<mindiff:
            mindiff = np.abs(en-st)

mindiff * np.diff(timearray_sliced)[0]

n_close_events = 0
for st in starts:
    for en in ends:
        if np.abs(en-st) < 4*7:
            n_close_events += 1
print(n_close_events)


7


In [None]:
fig, ax = plt.subplots(figsize=(12, 6))
ax.plot(rats_pre)
ax.axhline(y=1, label='unity', color='red', linestyle='--');
ax.set_title("Pre-During Variance with 14 day lookback")
ax.set_ylabel("Var(during) / Var(Pre)")
ax.set_xlabel("Event Index")
ax.grid(True)
plt.legend();

plt.savefig("../figures/sol_tranq_pre_var.png")
plt.savefig('../literature/our_work/sol_tranq_pre_var.png')

In [None]:
tranq_var_rat_post = angle_window_variance_ratio_post(
    timearray_sliced,
    angles_sliced,
    tranq_masked,
    86,
    94,
    12*7,
    inverse = False
)

# Drop NaN values (occurs usually when there is no sunspot data for that date)
tranq_var_rat_post_filt = [win for win in tranq_var_rat_post if not np.isnan(win['ratio']) ]
rats_post = [win['ratio'] for win in tranq_var_rat_post][:]
rats_post = np.delete(rats_post,5)
print(f"Number of events: {len(tranq_var_rat_post_filt)}")
print(f"Ratios: {rats_post}")
print(f"Mean: {np.nanmean(rats_post)}")

In [None]:
fig, ax = plt.subplots(figsize=(12, 6))
ax.plot(rats_post)
ax.axhline(y=1, label='unity', color='red', linestyle='--');
ax.set_title("During/Post Variance with 14 day lookforward")
ax.set_ylabel("Var(during) / Var(Post)")
ax.set_xlabel("Event Index")
ax.grid(True)
ax.set_ylim(0,np.max(rats_post))
plt.legend();
plt.savefig("../figures/sol_tranq_post_var.png")
plt.savefig('../literature/our_work/sol_tranq_post_var.png')

In [None]:
# Plot the mean pre ratio versus lookback time

def compute_pre_mean(lb_time):
    res = angle_window_variance_ratio(
        timearray_sliced,
        angles_sliced,
        tranq_masked,
        86,
        94,
        lb_time,
        inverse = False
    )

    # Drop NaN values (occurs usually when there is no sunspot data for that date)
    rats_pre = [win['ratio'] for win in res]
    return np.mean(rats_pre)

def compute_post_mean(lb_time):
    res = angle_window_variance_ratio_post(
        timearray_sliced,
        angles_sliced,
        tranq_masked,
        86,
        94,
        lb_time,
        inverse = False
    )

    # Drop NaN values (occurs usually when there is no sunspot data for that date)
    rats_pre = [win['ratio'] for win in res]
    return np.nanmean(rats_pre)

pre_means = [compute_pre_mean(lb) for lb in np.arange(4,28)]
post_means = [compute_post_mean(lb) for lb in np.arange(4,28)]

fig, ax = plt.subplots(figsize=(12, 6))
ax.plot(pre_means, label="during-pre")
ax.plot(post_means, label="during-post")
ax.axhline(y=1, label='unity', color='red', linestyle='--');
ax.set_title("Pre- and Post- ratios versus lookback/lookforward time")
ax.set_ylabel("<Var(during) / Var(Pre/Post)>")
ax.set_xlabel("Lookback/forward time")
ax.grid(True)
ax.set_ylim(0,np.nanmax(post_means))
plt.legend();
plt.savefig("../figures/mean_ratio_vs_lookbackforward.png")
plt.savefig('../literature/our_work/mean_ratio_vs_lookbackforward.png')

### P-scores

In [None]:
# Compute number of pre-ratios that are below unity
n_below_pre = len([rat for rat in rats_pre if rat < 1])

# Compute number of post-ratios that are below unity
n_below_post = len([rat for rat in rats_post if rat < 1])

# Number of total events
n_events = len(rats_pre)

# Compute p-score, assuming the null-hypothesis is unity
pre_p_score = scipy.stats.binomtest(n_below_pre, n_events, p=0.5).pvalue
post_p_score = scipy.stats.binomtest(n_below_post, n_events, p=0.5).pvalue

print(f"""
    Number of events with lower Var during (pre): {n_below_pre}
    Number of events with lower Var during (post): {n_below_post}
    Events with lower Var during (pre) - p score: {pre_p_score}
    Events with lower Var during (post) - p score: {post_p_score}
""")

## Study B

### Load and process data

In [None]:
tranquility = sp_file['Tranquility']
use_roll_avg = False

if use_roll_avg:
    avg_x_days = 7
    print(f"{FancyColors.IMPORTANT}Using {avg_x_days}-day rolling average of Tranquility{FancyColors.RESET}")
    tranquility = tranquility.rolling(window=avg_x_days).mean()


sp_file_time = sp_file['jd'].values
ps_file_time = file['Earth']['jd'][:]

# Use entire aligned date range
time_lower = np.max([sp_file_time[0], ps_file_time[0]]) #Time('2015-06-05', format='iso').jd
time_higher = np.min([sp_file_time[-1], ps_file_time[-1]]) # Time('2019-12-05', format='iso').jd

# Generate the time mask
MASK = ( timearray >= time_lower ) & ( timearray <= time_higher)
SP_MASK = ( sp_file['jd'] >= time_lower ) & ( sp_file['jd'] <= time_higher )

# Slice the time array
timearray_sliced = timearray[ MASK ]
sp_time_sliced = sp_file_time[ SP_MASK ]

# Ensure time arrays are aligned
print(f"Time arrays are aligned: {np.all(timearray_sliced == sp_time_sliced)}")

# Slice the angle array
angles_sliced = angles[ MASK ]

# Create a boolean array to show where 90 degree events occur
events_bool = ( angles_sliced >= __ANGLE_LIMIT_LOWER__ ) & ( angles_sliced <= __ANGLE_LIMIT_HIGHER__)

# Slice the tranquility array to the times as well
tranq_masked = tranquility[SP_MASK].values

### Compute post X-day difference

First use the local-averaging approach

In [None]:
# 3 weeks after and 10 day local-averaging times span gives most positive differences, 22/26 -> pscore 0.0005335211753845215
# 5 weeks after and 4 day local-averaging times gives most negative differences, 15/26 -> pscore 0.557197093963623

weeks_after = 3
endof_points = 10 # Number of points to use for local averaging around end of event
after_points = 10 # Number of points to use for local averaging around x-weeks after

delta_info = angle_window_post_event_difference(
    timearray_sliced, 
    angles_sliced, 
    tranq_masked, 
    __ANGLE_LIMIT_LOWER__,
    __ANGLE_LIMIT_HIGHER__,
    weeks_after, # weeks after
    n_points_at_end = endof_points,
    n_points_after = after_points
)
print(f"Number of windows: {len(delta_info)}")

In [None]:
differences = [event['difference'] for event in delta_info]
errors = [event['error'] for event in delta_info]
event_times_jd = [event['end_time'] for event in delta_info]

# Convert Julian dates to datetime
event_dates = Time(event_times_jd, format='jd').to_datetime()

# Create the plot
fig, ax = plt.subplots(figsize=(14, 6))

# Plot with error bars
ax.errorbar(event_dates, differences, yerr=errors,
            fmt='o', markersize=8, capsize=5, capthick=2,
            color='steelblue', ecolor='lightsteelblue', elinewidth=2,
            label='Difference ± Error')

# Add horizontal line at zero for reference
ax.axhline(y=0, color='red', linestyle='--', linewidth=1.5, alpha=0.7, label='Zero line')

# Formatting
ax.set_xlabel('Event End Date', fontsize=12)
ax.set_ylabel('Difference (After - At End)', fontsize=12)
ax.set_title('Post-Event Difference in Data with Error Bars', fontsize=14)
ax.grid(True, alpha=0.3)
ax.legend(fontsize=10)

# Format x-axis to show dates nicely
import matplotlib.dates as mdates
ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
ax.xaxis.set_major_locator(mdates.AutoDateLocator())
plt.xticks(rotation=45, ha='right')


plt.savefig("../figures/3weekdeltas.png")
plt.savefig('../literature/our_work/3weekdeltas.png')
plt.tight_layout()
plt.show()


Compute the number of events where the difference is negative

In [None]:
n_diff_decrease = len([info['difference'] for info in delta_info if info['difference']<0])
print(f"Number of events where the {weeks_after}-weeks after is less than the end-of-event: {FancyColors.YELLOW}{n_diff_decrease}{FancyColors.RESET}")
print(f"Associated binomial p-score: {binom_p(n_diff_decrease, len(delta_info))}")

Now use the gradient approach

In [None]:
weeks_after = 3

delta_grad_info = angle_window_post_event_gradient(
    timearray_sliced, 
    angles_sliced, 
    tranq_masked, 
    __ANGLE_LIMIT_LOWER__,
    __ANGLE_LIMIT_HIGHER__,
    weeks_after, # weeks after
)
print(f"Number of windows: {len(delta_grad_info)}")

In [None]:
differences = [event['gradient'] for event in delta_grad_info]
errors = [event['gradient_error'] for event in delta_grad_info]
event_times_jd = [event['end_time'] for event in delta_grad_info]

# Convert Julian dates to datetime
event_dates = Time(event_times_jd, format='jd').to_datetime()

# Create the plot
fig, ax = plt.subplots(figsize=(14, 6))

# Plot with error bars
ax.errorbar(event_dates, differences, yerr=errors,
            fmt='o', markersize=8, capsize=5, capthick=2,
            color='steelblue', ecolor='lightsteelblue', elinewidth=2,
            label='Gradient ± Error')

# Add horizontal line at zero for reference
ax.axhline(y=0, color='red', linestyle='--', linewidth=1.5, alpha=0.7, label='Zero line')

# Formatting
ax.set_xlabel('Event End Date', fontsize=12)
ax.set_ylabel('Gradient (After - At End)', fontsize=12)
ax.set_title('Post-Event Gradient in Data with Error Bars', fontsize=14)
ax.grid(True, alpha=0.3)
ax.legend(fontsize=10)

# Format x-axis to show dates nicely
import matplotlib.dates as mdates
ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
ax.xaxis.set_major_locator(mdates.AutoDateLocator())
plt.xticks(rotation=45, ha='right')

plt.tight_layout()
plt.show()


In [None]:
n_grad_decrease = len([info['gradient'] for info in delta_grad_info if info['gradient']<0])
print(f"Number of events where the {weeks_after}-weeks after is less than the end-of-event: {FancyColors.YELLOW}{n_grad_decrease}{FancyColors.RESET}")
print(f"Associated binomial p-score: {binom_p(n_grad_decrease, len(delta_info))}")

In [None]:
diff_p_score = scipy.stats.binomtest(n_diff_decrease, len(delta_grad_info), p=0.5).pvalue
grad_p_score = scipy.stats.binomtest(n_grad_decrease, len(delta_grad_info), p=0.5).pvalue
diff_p_score_above = scipy.stats.binomtest(len(delta_grad_info) - n_diff_decrease, len(delta_grad_info), p=0.5).pvalue
grad_p_score_above = scipy.stats.binomtest(len(delta_grad_info) - n_grad_decrease, len(delta_grad_info), p=0.5).pvalue

print('='*10, ' for n points below ', '='*10)
print(f"Difference-based p-score (below) with {endof_points} local points for end-mean and {after_points} local points for after-mean: {diff_p_score}")
print(f"Gradient-based p-score: {grad_p_score}")
print('\n', '='*10, ' for n points above ', '='*10)
print(f"Difference-based p-score (above) with {endof_points} local points for end-mean and {after_points} local points for after-mean: {diff_p_score_above}")
print(f"Gradient-based p-score (above) : {grad_p_score_above}")

<span style="color: cyan;">
Now we analyze the change in the delta (using the local-averaged difference) versus lookforward time
</span>

In [None]:
lookforward = np.linspace(0,10,11, dtype=np.int64)
local_avg = np.linspace(1,10, 10, dtype=np.int64)
total_data = np.zeros((len(local_avg), len(lookforward), len(delta_info)))

for inx, loc_avg in enumerate(local_avg):
    mean_diffs = []
    for wk_after in np.linspace(0,10,11,dtype=np.int64):
        delta_info = angle_window_post_event_difference(
            timearray_sliced, 
            angles_sliced, 
            tranq_masked, 
            __ANGLE_LIMIT_LOWER__,
            __ANGLE_LIMIT_HIGHER__,
            wk_after, # weeks after
            n_points_at_end = loc_avg,
            n_points_after = loc_avg
        )
        diffs = [info['difference'] for info in delta_info]
        mean_diffs.append(diffs)
    total_data[inx] = mean_diffs

In [None]:
x_data = lookforward  # Your time array

# Create figure with vertically stacked subplots
n_plots = len(total_data)
fig, axes = plt.subplots(n_plots, 1, figsize=(14, 3*n_plots), sharex=True)

# If only one plot, axes won't be an array, so make it one
if n_plots == 1:
    axes = [axes]

# Plot each data array
for i, (ax, data) in enumerate(zip(axes, total_data)):
    # Convert time to datetime for nice formatting
    mdata = np.mean(data, axis=1)
    ax.plot(x_data, mdata, linewidth=1.5)
    ax.grid(True, alpha=0.3)
    ax.set_ylim(np.min(mdata), np.max(mdata))

    ax.set_ylabel(f"Local avg: {local_avg[i]}")

    # Only show x-label on bottom plot
    if i == n_plots - 1:
        ax.set_xlabel('Lookforward time', fontsize=12)

# Add overall title
fig.suptitle('Stacked Time Series Data', fontsize=14, y=0.995)
plt.tight_layout()
plt.subplots_adjust(hspace=0)

plt.show()


In [None]:
tdat = []
for inx, loc_avg in enumerate(local_avg):
    for linx, lkfor in enumerate(lookforward):
        dat = total_data[inx, linx]
        n_below = len([d for d in dat if d<0])
        total_points = len(dat)
        #print(f"Local avg points: {loc_avg}  look forward: {lkfor}   n_below: {n_below}/{total_points}")
        tdat.append( (loc_avg, lkfor, n_below) )
[print(tup) for tup in tdat if tup[2] >= 15]
print('\n')
print([tup for tup in tdat if 0<tup[2] <=5])
print(f" p-score: {binom_p(26-4, 26)}")

<span style="color: cyan;">
We see that to maximize the differences, we pick a 3-week lookforward time and a 10 day local averaging span.

This produces a pscore of 0.00053 for 4 / 26 days with negative difference

To show randomness in the tranquility data, we repeat this calculation for randomly selected 3-week spans
</span>

In [None]:
results = random_span_difference_null_distribution(
    tranq_masked,
    span_weeks = 3,
    n_local_points = 10,
    n_trials =5000
)

print(f"""
Percent number of positive differences: {results["percent_positive"]}
Percent number of negative differences: {results["percent_negative"]}
Percent number of zero differences: {results["percent_zero"]}
""")

# Study C

## Load Data

In [13]:
tranquility = sp_file['Tranquility']
use_roll_avg = False

if use_roll_avg:
    avg_x_days = 7
    print(f"{FancyColors.IMPORTANT}Using {avg_x_days}-day rolling average of Tranquility{FancyColors.RESET}")
    tranquility = tranquility.rolling(window=avg_x_days).mean()


sp_file_time = sp_file['jd'].values
ps_file_time = file['Earth']['jd'][:]

# Use entire aligned date range
time_lower = np.max([sp_file_time[0], ps_file_time[0]]) #Time('2015-06-05', format='iso').jd
time_higher = np.min([sp_file_time[-1], ps_file_time[-1]]) # Time('2019-12-05', format='iso').jd

# Generate the time mask
MASK = ( timearray >= time_lower ) & ( timearray <= time_higher)
SP_MASK = ( sp_file['jd'] >= time_lower ) & ( sp_file['jd'] <= time_higher )

# Slice the time array
timearray_sliced = timearray[ MASK ]
sp_time_sliced = sp_file_time[ SP_MASK ]

# Ensure time arrays are aligned
print(f"Time arrays are aligned: {np.all(timearray_sliced == sp_time_sliced)}")

# Slice the angle array
angles_sliced = angles[ MASK ]

# Create a boolean array to show where 90 degree events occur
events_bool = ( angles_sliced >= __ANGLE_LIMIT_LOWER__ ) & ( angles_sliced <= __ANGLE_LIMIT_HIGHER__)

# Slice the tranquility array to the times as well
tranq_masked = tranquility[SP_MASK].values

Time arrays are aligned: True


## Recompute tranquility decrease post-events

In [14]:
# 3 weeks after and 10 day local-averaging times span gives most positive differences, 22/26 -> pscore 0.0005335211753845215
# 5 weeks after and 4 day local-averaging times gives most negative differences, 15/26 -> pscore 0.557197093963623

weeks_after = 3
endof_points = 10 # Number of points to use for local averaging around end of event
after_points = 10 # Number of points to use for local averaging around x-weeks after

delta_info = angle_window_post_event_difference(
    timearray_sliced, 
    angles_sliced, 
    tranq_masked, 
    __ANGLE_LIMIT_LOWER__,
    __ANGLE_LIMIT_HIGHER__,
    weeks_after, # weeks after
    n_points_at_end = endof_points,
    n_points_after = after_points
)
print(f"Number of windows: {len(delta_info)}")
delta_info[0].keys()

Number of windows: 26


dict_keys(['end_idx', 'after_idx', 'end_time', 'mean_at_end', 'mean_after', 'difference', 'std_at_end', 'std_after', 'error', 'data_at_end', 'data_after'])

## Normalize planetary center vectors magnitudes to [0,1]

In [None]:
mags = np.linalg.norm(g2_barycenter, axis=1)
min_mag = min(mags)
max_mag = max(mags)
mags_normed = (mags - min_mag) / (max_mag- min_mag)
g2_center_normed = g2_barycenter * (mags_normed / mags)[:,None]

# Slice the normed centers to be aligned with the sunspot data
g2_center_normed = g2_center_normed[ MASK ]

## Compute the planetary center magnitude at the events and compare to the tranquility deltas for each event

In [None]:
data_pairs = []
for info in delta_info:
    delta = info['difference']
    center_mag_at_event_start = np.linalg.norm(g2_center_normed[info['end_idx']])
    center_mag_at_after_event = np.linalg.norm(g2_center_normed[info['after_idx']])
    mean_mag = np.mean([center_mag_at_after_event, center_mag_at_event_start])
    data_pairs.append(
        (delta, mean_mag)
    )


## Plot the magnitude of the second planetary group center versus the difference in solar tranquility

In [None]:
deltas = np.array([d[0] for d in data_pairs])
mags = np.array([d[1] for d in data_pairs])

fig, ax = plt.subplots(figsize=(16,8))
ax.plot(mags, deltas, 'o')
ax.set_xlabel(r'$||g2_{center}||$')
ax.set_ylabel("Tranquility Deltas")
ax.set_title("Magnitude of Tranquility difference 3-weeks post event versus magnitude of G2 center")



plt.savefig("../figures/3weekdeltas_g2pos.png")
plt.savefig('../literature/our_work/3weekdeltas_g2pos.png')

## Compute statistics

In [None]:
# Correlation coefficient
pear_corr, pscore = scipy.stats.pearsonr(mags, deltas)
n_sigma = scipy.stats.norm.ppf(1 - pscore/2)
print(f"Pearson correlation: {pear_corr}")
print(f"p-score: {pscore}")
print(f"Number of sigma: {n_sigma}")

The p-value roughly indicates the probability of an uncorrelated system producing datasets that have a Pearson correlation at least as extreme as the one computed from these datasets.