## Step 0: Global Configuration and Switches

In [1]:
import pandas as pd
import numpy as np
import os
import pickle
import time
import gc
import warnings
import importlib
from multiprocessing import Pool, cpu_count
import scipy.stats as stats  # For fast P-value calculation

# Try to import external worker, ignore if not found (inline logic provided below)
try:
    import algo_workers
    importlib.reload(algo_workers)
except ImportError:
    algo_workers = None

# =============================================================================
# 1. Global Configuration and Switches
# =============================================================================

# [Core Switch] True = Test Mode (Fast, 100 simulations); False = Production Mode (Rigorous, 1000/10000 simulations)
TEST_MODE = False 

if TEST_MODE:
    print(">>> [Mode] Test Mode")
    print("    - Algo 1/2 Simulation Count: 100")
    print("    - Algo 3 Simulation Count: 100 (or use binomial distribution)")
    print("    - Core Count: 1 (avoid debugging deadlocks)")
    N_SIM_ALGO12 = 100
    N_SIM_ALGO3 = 100
    N_WORKERS = 1
else:
    print(">>> [Mode] Production Mode")
    print("    - Algo 1/2 Simulation Count: 10000")
    print("    - Algo 3 Simulation Count: 1000")
    # Leave 2 cores for the system to prevent freezing
    N_WORKERS = max(1, cpu_count() - 2)
    N_SIM_ALGO12 = 10000
    N_SIM_ALGO3 = 1000
    print(f"    - Core Count: {N_WORKERS}")

# Path Configuration
BASE_INPUT_DIR = '../../data/ready'
BASE_OUTPUT_DIR = '../../results/04_conj_enh_opp_sup/sg'
CACHE_DIR = os.path.join(BASE_OUTPUT_DIR, 'cache_data')

# Original large file path (contains 781 bodies)
EPHEMERIS_FILE = os.path.join(BASE_INPUT_DIR, '781_planets_dwarfs_asteroids_lonlat.parquet')

# Output File Paths
OUTPUT_FILE_ALGO1 = os.path.join(BASE_OUTPUT_DIR, 'sg_algo1_total_pairs.csv')
OUTPUT_FILE_ALGO2 = os.path.join(BASE_OUTPUT_DIR, 'sg_algo2_at_least_one.csv')
OUTPUT_FILE_ALGO3 = os.path.join(BASE_OUTPUT_DIR, 'sg_algo3_single_body_781.csv')
OUTPUT_FILE_KUIPER = os.path.join(BASE_OUTPUT_DIR, 'sg_algo_kuiper_test.csv')

# 8 Major Planets column names (for Algo 1 & 2)
PLANET_COLS = ['199_lon', '299_lon', '399_lon', '499_lon', '599_lon', '699_lon', '799_lon', '899_lon']
THRESHOLDS = [1, 2, 3, 4, 5]

# Ensure directories exist
if not os.path.exists(BASE_OUTPUT_DIR): os.makedirs(BASE_OUTPUT_DIR)
if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR)
warnings.filterwarnings('ignore')

# =============================================================================
# 2. General Utility Functions
# =============================================================================

def categorize_area(area):
    """Sunspot area categorization"""
    if area < 100: return 'Small <100'
    elif 100 <= area < 500: return 'Medium 100-500'
    elif 500 <= area < 2000: return 'Large 500-2000'
    else: return 'XLarge >2000'

def interpolate_angle(angle1, angle2, fraction):
    """Vectorized angle interpolation (handles 0-360 wrap-around)"""
    rad1 = np.deg2rad(angle1)
    rad2 = np.deg2rad(angle2)
    # Calculate shortest path difference
    delta = (rad2 - rad1 + np.pi) % (2 * np.pi) - np.pi
    interpolated_rad = rad1 + fraction * delta
    return np.degrees(interpolated_rad) % 360.0

print("--- Configuration Loaded ---")

>>> [Mode] Production Mode
    - Algo 1/2 Simulation Count: 10000
    - Algo 3 Simulation Count: 1000
    - Core Count: 30
--- Configuration Loaded ---


## Step 1: Optimized Data Pre-processing

In [2]:
# =============================================================================
# Step 1: Optimized Data Pre-processing
# Strategy: Load full ephemeris -> Interpolate 'All' stage (deduplication) -> Lookup for other stages -> Save cache
# =============================================================================

def step1_data_preparation_optimized():
    print("\n" + "=" * 60)
    print("Step 1: Data Pre-processing (Full Align + Lookup)")
    print("=" * 60)
    
    start_time = time.time()
    
    # --- 1. Load Full Ephemeris (all 781 bodies) ---
    print(f"Loading FULL Ephemeris from: {EPHEMERIS_FILE} ...")
    if not os.path.exists(EPHEMERIS_FILE):
        print("Error: Ephemeris file not found!")
        return

    df_ephem = pd.read_parquet(EPHEMERIS_FILE)
    if 'date' in df_ephem.columns:
        df_ephem['date'] = pd.to_datetime(df_ephem['date'])
        df_ephem.set_index('date', inplace=True)
    
    # Normalize index (daily 00:00)
    df_ephem.index = df_ephem.index.normalize()
    df_ephem = df_ephem[~df_ephem.index.duplicated(keep='first')]
    df_ephem.sort_index(inplace=True)
    
    # Identify all longitude columns (assuming ending with _lon)
    all_body_cols = [c for c in df_ephem.columns if str(c).endswith('_lon')]
    print(f"Loaded Ephemeris: {len(df_ephem)} days, {len(all_body_cols)} bodies.")
    
    # Extract daily 00:00 data matrix (for interpolation)
    # Use float32 to save memory
    ephem_matrix_all = df_ephem[all_body_cols].values.astype(np.float32)
    
    # [Key] Save daily data for 8 major planets for Algo 1/2 CTS background simulation
    path_matrix_8p = os.path.join(CACHE_DIR, 'ephem_matrix_8p.npy')
    np.save(path_matrix_8p, df_ephem[PLANET_COLS].values.astype(np.float64))
    
    # [Key] Calculate Kepler Maps (only for 8 major planets, required by Algo 1/2)
    print("Calculating Kepler probability maps (for 8 planets)...")
    prob_maps = {}
    for col in PLANET_COLS:
        hist, _ = np.histogram(df_ephem[col], bins=360, range=(0, 360), density=True)
        prob_maps[col] = hist
    with open(os.path.join(CACHE_DIR, 'kepler_prob_maps.pkl'), 'wb') as f:
        pickle.dump(prob_maps, f)

    # --- 2. Process 'All' file (core interpolation) ---
    print("\nProcessing 'All' dataset (Heavy Interpolation)...")
    file_all = 'sg_1874_2025_all.csv'
    path_all = os.path.join(BASE_INPUT_DIR, file_all)
    
    if not os.path.exists(path_all):
        print(f"Error: {file_all} not found!")
        return

    df_sun_all = pd.read_csv(path_all, usecols=['date', 'hme_lon', 'area'])
    df_sun_all['date'] = pd.to_datetime(df_sun_all['date'])
    df_sun_all.dropna(subset=['hme_lon', 'area'], inplace=True)
    
    # [New] Core deduplication logic: based on test results, clear duplicates in 'All' to prevent data bloat
    initial_len = len(df_sun_all)
    df_sun_all.drop_duplicates(subset=['date', 'hme_lon', 'area'], inplace=True)
    dropped_len = initial_len - len(df_sun_all)
    if dropped_len > 0:
        print(f"  [Info] Dropped {dropped_len} duplicate records from All dataset.")

    # Filter time range
    min_date, max_date = df_ephem.index.min(), df_ephem.index.max()
    df_sun_all = df_sun_all[(df_sun_all['date'] >= min_date) & (df_sun_all['date'] <= max_date)].copy()
    
    # Interpolation calculation
    print(f"  Interpolating {len(df_sun_all)} records against {len(all_body_cols)} bodies...")
    
    day_t = df_sun_all['date'].dt.normalize()
    # Calculate time fraction
    fraction = (df_sun_all['date'] - day_t).dt.total_seconds() / 86400.0
    # Find index
    idx_t = df_ephem.index.searchsorted(day_t)
    
    # Boundary protection
    valid_mask = (idx_t < len(df_ephem) - 1)
    if not valid_mask.all():
        df_sun_all = df_sun_all[valid_mask]
        fraction = fraction[valid_mask]
        idx_t = idx_t[valid_mask]
        
    pos_t = ephem_matrix_all[idx_t]
    pos_t_plus_1 = ephem_matrix_all[idx_t + 1]
    fraction_vals = fraction.values[:, np.newaxis].astype(np.float32)
    
    # Vectorized interpolation
    interpolated_matrix = interpolate_angle(pos_t, pos_t_plus_1, fraction_vals)
    
    # Construct result DataFrame
    df_result_all = df_sun_all.copy()
    df_result_all['ephem_idx_daily'] = idx_t # Save index for CTS use
    df_result_all['Group'] = df_result_all['area'].apply(categorize_area)
    
    # Concatenate body data (this step may consume memory, can be chunked if needed)
    df_bodies = pd.DataFrame(interpolated_matrix, columns=all_body_cols, index=df_result_all.index)
    df_final_all = pd.concat([df_result_all, df_bodies], axis=1)
    
    # Save 'Ready' data for 'All'
    save_path_all = os.path.join(CACHE_DIR, 'ready_All.parquet')
    df_final_all.to_parquet(save_path_all)
    print(f"  Saved FULL aligned data to: {save_path_all}")
    
    # --- 3. Process other files (lookup method) ---
    other_files = {
        'sg_1874_2025_daily.csv': 'Daily',
        'sg_1874_2025_diss.csv': 'Dissipation',
        'sg_1874_2025_dur.csv': 'Duration',
        'sg_1874_2025_onset.csv': 'Onset'
    }
    
    print("\nProcessing subset files (Lookup method)...")
    for fname, stage_name in other_files.items():
        fpath = os.path.join(BASE_INPUT_DIR, fname)
        if not os.path.exists(fpath): continue
        
        print(f"  Extracting Stage: {stage_name} from All data...")
        df_sub = pd.read_csv(fpath, usecols=['date', 'hme_lon', 'area'])
        df_sub['date'] = pd.to_datetime(df_sub['date'])
        
        # Inner Join using pre-calculated 'All' data
        # Since 'All' is deduplicated, this is a lookup based on unique keys
        df_merged = pd.merge(
            df_sub, 
            df_final_all, 
            on=['date', 'hme_lon', 'area'], 
            how='inner'
        )
        
        save_path = os.path.join(CACHE_DIR, f'ready_{stage_name}.parquet')
        df_merged.to_parquet(save_path)
        print(f"    Saved {len(df_merged)} records to {save_path}")

    # Clean up memory
    del df_ephem, ephem_matrix_all, df_final_all, df_bodies
    gc.collect()
    
    print(f"Step 1 Completed. Total Time: {time.time() - start_time:.1f}s")

# Execute Step 1
if __name__ == '__main__':
    step1_data_preparation_optimized()


Step 1: Data Pre-processing (Full Align + Lookup)
Loading FULL Ephemeris from: ../../data/ready/781_planets_dwarfs_asteroids_lonlat.parquet ...
Loaded Ephemeris: 73780 days, 781 bodies.
Calculating Kepler probability maps (for 8 planets)...

Processing 'All' dataset (Heavy Interpolation)...
  [Info] Dropped 35 duplicate records from All dataset.
  Interpolating 256824 records against 781 bodies...
  Saved FULL aligned data to: ../../results/04_conj_enh_opp_sup/sg/cache_data/ready_All.parquet

Processing subset files (Lookup method)...
  Extracting Stage: Daily from All data...
    Saved 8301 records to ../../results/04_conj_enh_opp_sup/sg/cache_data/ready_Daily.parquet
  Extracting Stage: Dissipation from All data...
    Saved 27870 records to ../../results/04_conj_enh_opp_sup/sg/cache_data/ready_Dissipation.parquet
  Extracting Stage: Duration from All data...
    Saved 75678 records to ../../results/04_conj_enh_opp_sup/sg/cache_data/ready_Duration.parquet
  Extracting Stage: Onset f

## Step 2: Algorithm 1 (Total Pairs)

In [3]:
# =============================================================================
# Step 2: Algorithm 1 (Total Pairs)
# =============================================================================

def step2_run_algo1():
    print("\n" + "=" * 60)
    print(f"Step 2: Running Algo 1 | Sim: {N_SIM_ALGO12} | Workers: {N_WORKERS}")
    print("=" * 60)

    # [New] Prevent duplicate runs in Jupyter from causing append pollution
    if os.path.exists(OUTPUT_FILE_ALGO1):
        os.remove(OUTPUT_FILE_ALGO1)
        print(f"Warning: Removed existing output file {OUTPUT_FILE_ALGO1} to avoid duplicates.")
    
    # Load cache
    try:
        # Load daily position matrix for 8 major planets (for fast CTS lookup)
        ephem_matrix_daily = np.load(os.path.join(CACHE_DIR, 'ephem_matrix_8p.npy'))
        with open(os.path.join(CACHE_DIR, 'kepler_prob_maps.pkl'), 'rb') as f:
            prob_maps = pickle.load(f)
    except FileNotFoundError:
        print("Error: Cache files not found. Please run Step 1 first.")
        return

    results_buffer = []
    files = [f for f in os.listdir(CACHE_DIR) if f.startswith('ready_') and f.endswith('.parquet')]
    
    total_start_time = time.time()

    for f in files:
        stage_name = f.replace('ready_', '').replace('.parquet', '')
        print(f"Processing Stage: {stage_name} ...")
        
        # Read data (only need to read 8 major planet columns)
        cols_to_load = ['hme_lon', 'ephem_idx_daily', 'Group'] + PLANET_COLS
        df = pd.read_parquet(os.path.join(CACHE_DIR, f), columns=cols_to_load)
        
        groups = sorted(df['Group'].unique(), key=lambda x: ('SMLX'.find(x[0]), x))
        groups.append('Total')
        
        for group in groups:
            subset = df if group == 'Total' else df[df['Group'] == group]
            if subset.empty: continue
            
            # Prepare core data
            sun_lons = subset['hme_lon'].values.astype(np.float64)
            obs_planets_interp = subset[PLANET_COLS].values.astype(np.float64) # Interpolated high-precision data
            sun_idxs_daily = subset['ephem_idx_daily'].values.astype(int)      # Integer index for CTS
            n_recs = len(subset)
            
            print(f"  Group: {group} (N={n_recs})")
            
            for w in THRESHOLDS:
                for etype in ['Opposition', 'Conjunction']:
                    
                    # --- 1. Calculate observed value (k_obs) ---
                    # Call vectorized function in algo_workers
                    if algo_workers:
                        k_obs = algo_workers.count_events_vectorized(sun_lons, obs_planets_interp, w, etype)
                    else:
                        # Inline fallback logic
                        if etype == 'Conjunction':
                            delta = np.abs(sun_lons[:, None] - obs_planets_interp)
                            delta = np.where(delta > 180, 360 - delta, delta)
                            k_obs = np.sum(delta <= w)
                        else:
                            delta = np.abs(np.abs(sun_lons[:, None] - obs_planets_interp) - 180)
                            k_obs = np.sum(delta <= w)
                    
                    # --- 2. Prepare CTS simulation ---
                    seeds = np.random.randint(0, 1000000000, N_SIM_ALGO12)
                    args = [(seed, sun_lons, ephem_matrix_daily, sun_idxs_daily, w, etype) for seed in seeds]
                    
                    # --- 3. Execute simulation (k_sims) ---
                    # Must use algo_workers.cts_worker_algo1 to support multi-core pickle
                    if N_WORKERS > 1 and algo_workers:
                        with Pool(N_WORKERS) as pool:
                            k_sims = pool.starmap(algo_workers.cts_worker_algo1, args)
                    else:
                        # Use when single-core or no external worker
                        if algo_workers:
                             k_sims = [algo_workers.cts_worker_algo1(*a) for a in args]
                        else:
                             # Simple placeholder, cannot perform CTS if algo_workers file is missing
                             print("Warning: algo_workers.py missing, skipping simulations.")
                             k_sims = [k_obs] * N_SIM_ALGO12

                    k_sims = np.array(k_sims)
                    
                    # --- 4. Statistical calculation ---
                    # --- [Modified] P-value calculation (two-tailed + effect direction) ---
                    # Calculate left-tail (suppression) probability
                    p_left = (np.sum(k_sims <= k_obs) + 1) / (N_SIM_ALGO12 + 1)
                    # Calculate right-tail (enhancement) probability
                    p_right = (np.sum(k_sims >= k_obs) + 1) / (N_SIM_ALGO12 + 1)
                    
                    # Two-tailed P-value (twice the minimum, capped at 1.0)
                    p_val = 2 * min(p_left, p_right)
                    if p_val > 1.0: p_val = 1.0
                    
                    # Record effect direction
                    effect = 'Suppression' if k_obs < k_sims.mean() else 'Enhancement'

                    if k_sims.std() == 0: z_score = 0
                    else: z_score = (k_obs - k_sims.mean()) / k_sims.std()
                    
                    # --- 5. Theoretical Baseline (Kepler Baseline) ---
                    # Use Kepler Prob Maps to calculate precise expectation
                    lon_indices = np.floor(sun_lons).astype(int) % 360
                    target_indices = (lon_indices + 180) % 360 if etype == 'Opposition' else lon_indices
                    k_exp = 0
                    for col in PLANET_COLS:
                        k_exp += np.sum(prob_maps[col][target_indices] * (2 * w))
                    
                    ratio = (k_obs / k_exp * 100) if k_exp > 0 else 0
                    
                    # --- [Important] Remember to add 'Effect': effect to the dictionary ---
                    results_buffer.append({
                        'Stage': stage_name, 'Group': group, 'Window': w, 'Type': etype,
                        'N_Records': n_recs, 'k_obs': k_obs, 'k_exp': round(k_exp, 2),
                        'Ratio': round(ratio, 2), 
                        'p_val': p_val, 
                        'Z_score': round(z_score, 2),
                        'Effect': effect  # <--- Added this line!
                    })
        
        # Stage save
        if results_buffer:
            new_df = pd.DataFrame(results_buffer)
            write_header = not os.path.exists(OUTPUT_FILE_ALGO1)
            new_df.to_csv(OUTPUT_FILE_ALGO1, mode='a', header=write_header, index=False)
            results_buffer = []

    print(f"Step 2 Completed. Time: {time.time()-total_start_time:.1f}s")

if __name__ == '__main__':
    step2_run_algo1()


Step 2: Running Algo 1 | Sim: 10000 | Workers: 30
Processing Stage: Dissipation ...
  Group: Small <100 (N=27541)
  Group: Medium 100-500 (N=318)
  Group: Large 500-2000 (N=11)
  Group: Total (N=27870)
Processing Stage: Onset ...
  Group: Small <100 (N=31241)
  Group: Medium 100-500 (N=1984)
  Group: Large 500-2000 (N=53)
  Group: Total (N=33278)
Processing Stage: Duration ...
  Group: Small <100 (N=69759)
  Group: Medium 100-500 (N=5813)
  Group: Large 500-2000 (N=106)
  Group: Total (N=75678)
Processing Stage: All ...
  Group: Small <100 (N=151201)
  Group: Medium 100-500 (N=86883)
  Group: Large 500-2000 (N=18110)
  Group: XLarge >2000 (N=630)
  Group: Total (N=256824)
Processing Stage: Daily ...
  Group: Small <100 (N=8268)
  Group: Medium 100-500 (N=29)
  Group: Large 500-2000 (N=4)
  Group: Total (N=8301)
Step 2 Completed. Time: 1558.2s


## Step 3: Algorithm 2 (Trigger Mechanism - At Least One)

In [4]:
# =============================================================================
# Step 3: Algorithm 2 (Trigger Mechanism - At Least One)
# =============================================================================

def step3_run_algo2():
    print("\n" + "=" * 60)
    print(f"Step 3: Running Algo 2 | Sim: {N_SIM_ALGO12} | Workers: {N_WORKERS}")
    print("=" * 60)

    # [New] Prevent duplicate runs in Jupyter from causing append pollution (for Algo 2 file)
    if os.path.exists(OUTPUT_FILE_ALGO2):
        os.remove(OUTPUT_FILE_ALGO2)
        print(f"Warning: Removed existing output file {OUTPUT_FILE_ALGO2} to avoid duplicates.")
    
    try:
        ephem_matrix_daily = np.load(os.path.join(CACHE_DIR, 'ephem_matrix_8p.npy'))
        with open(os.path.join(CACHE_DIR, 'kepler_prob_maps.pkl'), 'rb') as f:
            prob_maps = pickle.load(f)
    except FileNotFoundError: return
            
    results_buffer = []
    files = [f for f in os.listdir(CACHE_DIR) if f.startswith('ready_') and f.endswith('.parquet')]
    
    total_start_time = time.time()
    
    for f in files:
        stage_name = f.replace('ready_', '').replace('.parquet', '')
        print(f"Processing Stage: {stage_name} ...")
        
        cols_to_load = ['hme_lon', 'ephem_idx_daily', 'Group'] + PLANET_COLS
        df = pd.read_parquet(os.path.join(CACHE_DIR, f), columns=cols_to_load)
        
        groups = sorted(df['Group'].unique(), key=lambda x: ('SMLX'.find(x[0]), x))
        groups.append('Total')
        
        for group in groups:
            subset = df if group == 'Total' else df[df['Group'] == group]
            if subset.empty: continue
            
            sun_lons = subset['hme_lon'].values.astype(np.float64)
            obs_planets_interp = subset[PLANET_COLS].values.astype(np.float64)
            sun_idxs_daily = subset['ephem_idx_daily'].values.astype(int)
            n_recs = len(subset)
            
            print(f"  Group: {group} (N={n_recs})")
            
            for w in THRESHOLDS:
                for etype in ['Opposition', 'Conjunction']:
                    
                    # --- 1. Calculate observed value (k_obs) ---
                    if algo_workers:
                        k_obs = algo_workers.count_events_at_least_once(sun_lons, obs_planets_interp, w, etype)
                    else:
                        # Inline fallback
                        if etype == 'Conjunction':
                            delta = np.abs(sun_lons[:, None] - obs_planets_interp)
                            delta = np.where(delta > 180, 360 - delta, delta)
                            is_event = np.any(delta <= w, axis=1)
                        else:
                            delta = np.abs(np.abs(sun_lons[:, None] - obs_planets_interp) - 180)
                            is_event = np.any(delta <= w, axis=1)
                        k_obs = np.sum(is_event)
                    
                    # --- 2. Simulation ---
                    seeds = np.random.randint(0, 1000000000, N_SIM_ALGO12)
                    args = [(seed, sun_lons, ephem_matrix_daily, sun_idxs_daily, w, etype) for seed in seeds]
                    
                    if N_WORKERS > 1 and algo_workers:
                        with Pool(N_WORKERS) as pool:
                            k_sims = pool.starmap(algo_workers.cts_worker_algo2, args)
                    elif algo_workers:
                        k_sims = [algo_workers.cts_worker_algo2(*arg) for arg in args]
                    else:
                        k_sims = [k_obs] * N_SIM_ALGO12
                            
                    k_sims = np.array(k_sims)
                    
                    # --- 3. Statistics ---
                    # --- [Modified] P-value calculation (two-tailed + effect direction) ---
                    # Calculate left-tail (suppression) probability
                    p_left = (np.sum(k_sims <= k_obs) + 1) / (N_SIM_ALGO12 + 1)
                    # Calculate right-tail (enhancement) probability
                    p_right = (np.sum(k_sims >= k_obs) + 1) / (N_SIM_ALGO12 + 1)
                    
                    # Two-tailed P-value
                    p_val = 2 * min(p_left, p_right)
                    if p_val > 1.0: p_val = 1.0
                    
                    # Record effect direction
                    effect = 'Suppression' if k_obs < k_sims.mean() else 'Enhancement'

                    if k_sims.std() == 0: z_score = 0
                    else: z_score = (k_obs - k_sims.mean()) / k_sims.std()
                    
                    # --- 4. Theoretical Baseline (Prob At Least One) ---
                    lon_indices = np.floor(sun_lons).astype(int) % 360
                    target_indices = (lon_indices + 180) % 360 if etype == 'Opposition' else lon_indices
                    
                    p_mat = np.zeros((n_recs, len(PLANET_COLS)))
                    for i, col in enumerate(PLANET_COLS):
                        p_mat[:, i] = prob_maps[col][target_indices] * (2 * w)
                    
                    # P(At Least One) = 1 - Prod(1 - P_i)
                    p_at_least_one = 1.0 - np.prod(1.0 - p_mat, axis=1)
                    k_exp = np.sum(p_at_least_one)
                    
                    ratio = (k_obs / k_exp * 100) if k_exp > 0 else 0
                 
                    # --- [Important] Remember to add 'Effect': effect to the dictionary ---
                    results_buffer.append({
                        'Stage': stage_name, 'Group': group, 'Window': w, 'Type': etype,
                        'N_Records': n_recs, 'k_obs': k_obs, 'k_exp': round(k_exp, 2),
                        'Ratio': round(ratio, 2), 
                        'p_val': p_val, 
                        'Z_score': round(z_score, 2),
                        'Effect': effect  # <--- Added this line!
                    })
        
        if results_buffer:
            new_df = pd.DataFrame(results_buffer)
            write_header = not os.path.exists(OUTPUT_FILE_ALGO2)
            new_df.to_csv(OUTPUT_FILE_ALGO2, mode='a', header=write_header, index=False)
            results_buffer = []

    print(f"Step 3 Completed. Time: {time.time()-total_start_time:.1f}s")

if __name__ == '__main__':
    step3_run_algo2()


Step 3: Running Algo 2 | Sim: 10000 | Workers: 30
Processing Stage: Dissipation ...
  Group: Small <100 (N=27541)
  Group: Medium 100-500 (N=318)
  Group: Large 500-2000 (N=11)
  Group: Total (N=27870)
Processing Stage: Onset ...
  Group: Small <100 (N=31241)
  Group: Medium 100-500 (N=1984)
  Group: Large 500-2000 (N=53)
  Group: Total (N=33278)
Processing Stage: Duration ...
  Group: Small <100 (N=69759)
  Group: Medium 100-500 (N=5813)
  Group: Large 500-2000 (N=106)
  Group: Total (N=75678)
Processing Stage: All ...
  Group: Small <100 (N=151201)
  Group: Medium 100-500 (N=86883)
  Group: Large 500-2000 (N=18110)
  Group: XLarge >2000 (N=630)
  Group: Total (N=256824)
Processing Stage: Daily ...
  Group: Small <100 (N=8268)
  Group: Medium 100-500 (N=29)
  Group: Large 500-2000 (N=4)
  Group: Total (N=8301)
Step 3 Completed. Time: 1573.2s


## Step 4: Algorithm 3 (Single Body Full Version)

In [5]:
# =============================================================================
# Step 4: Algorithm 3 (Single Body Full Version) - Frequency + Amplitude + P-value
# =============================================================================

def worker_algo3_final(args):
    """
    Final Worker (Fixed Version): 
    1. Dynamically calculate Kepler probability (solve eccentricity issue)
    2. Two-tailed P-value test
    3. Record Enhancement/Suppression direction
    """
    body_name, body_lons, sun_lons, sun_areas, thresholds, n_sims = args
    results = []
    n_recs = len(sun_lons)

    # Force single-core execution
    N_WORKERS_STEP4 = 1
    
    # Global average area (for calculating amplitude ratio)
    global_avg_area = np.mean(sun_areas) if n_recs > 0 else 0
    
    # --- [Key Fix 1] Calculate the Kepler probability distribution (histogram) of the body on the fly ---
    # Count the frequency (density) of the body in each degree from 0-360, reflecting its residence time in various parts of the orbit
    # density=True ensures sum * bin_width = 1
    hist_prob, _ = np.histogram(body_lons, bins=360, range=(0, 360), density=True)
    
    # Pre-calculate integer indices of sun positions for fast lookup
    sun_idx_conj = np.floor(sun_lons).astype(int) % 360
    sun_idx_opp  = (sun_idx_conj + 180) % 360
    
    for w in thresholds:
        # [Removed] Old uniform distribution assumption: p0 = (2 * w) / 360.0 
        
        for etype in ['Opposition', 'Conjunction']:
            # --- A. Basic calculation (remains unchanged) ---
            if etype == 'Conjunction':
                d = np.abs(sun_lons - body_lons)
                d = np.where(d > 180, 360 - d, d)
                is_event = (d <= w)
                
                # [Key Fix 1] Get background probability only for coincidence moments
                target_indices = sun_idx_conj
                
            else: # Opposition
                d = np.abs(np.abs(sun_lons - body_lons) - 180)
                is_event = (d <= w)
                
                target_indices = sun_idx_opp
            
            k_obs = np.sum(is_event)
            
            # --- B. Calculate precise expectation (k_exp) based on Kepler distribution ---
            # Lookup: hist_prob[idx] is the probability density within 1 degree
            # Multiply by (2*w) to get the total probability within the window
            # Sum over all records to get the expected frequency
            daily_probs = hist_prob[target_indices] * (2 * w)
            k_exp = np.sum(daily_probs)
            
            # Calculate frequency ratio
            ratio_freq = (k_obs / k_exp * 100) if k_exp > 0 else 0
            
            # --- C. Amplitude metrics (remains unchanged) ---
            if k_obs > 0:
                event_avg_area = np.mean(sun_areas[is_event])
                ratio_amp = (event_avg_area / global_avg_area * 100) if global_avg_area > 0 else 0
            else:
                event_avg_area = 0
                ratio_amp = 0
            
            # --- D. Significance P-value (Two-tailed fixed version) ---
            # Since the probability of each trial is different, the binomial distribution is strictly not applicable.
            # But as a robust approximation, we substitute the average probability p_avg.
            p_avg = k_exp / n_recs if n_recs > 0 else 0
            
            # 1. Calculate two-tailed probability
            p_left = stats.binom.cdf(k_obs, n_recs, p_avg)
            p_right = stats.binom.sf(k_obs - 1, n_recs, p_avg)
            p_val = 2 * min(p_left, p_right)
            if p_val > 1.0: p_val = 1.0
            
            # 2. [Key Fix 2] Record effect direction
            effect = 'Suppression' if k_obs < k_exp else 'Enhancement'
              
            results.append({
                'Body': body_name,
                'Window': w,
                'Type': etype,
                'k_obs': k_obs,
                'k_exp': round(k_exp, 2),
                'Ratio_Freq': round(ratio_freq, 2),
                'Avg_Area': round(event_avg_area, 2),
                'Ratio_Amp': round(ratio_amp, 2),
                'p_val': p_val,
                'Effect': effect  # Must include this column
            })
    return results

def step4_run_algo3_final():
    print("\n" + "=" * 60)
    print("Step 4: Running Algo 3 (Final) - FORCE SINGLE CORE MODE")
    print("=" * 60)

    # [CRITICAL CONFIG] Force single-core execution
    # We explicitly set this to 1 to bypass global settings.
    # This avoids Windows multiprocessing pickling errors and IPC deadlocks.
    # Since the calculation is vector-based and fast, serial execution is optimal.
    FORCED_WORKERS = 1 
    
    # Remove existing output file to prevent duplicate appending from previous runs
    if os.path.exists(OUTPUT_FILE_ALGO3):
        os.remove(OUTPUT_FILE_ALGO3)
        print(f"Warning: Removed existing output file {OUTPUT_FILE_ALGO3} to avoid duplicates.")
    
    # Get list of input files from cache directory
    files = [f for f in os.listdir(CACHE_DIR) if f.startswith('ready_') and f.endswith('.parquet')]
    
    total_start = time.time()
    
    for f in files:
        stage_name = f.replace('ready_', '').replace('.parquet', '')
        print(f"\nProcessing Stage: {stage_name} ...")
        
        # Read data
        df = pd.read_parquet(os.path.join(CACHE_DIR, f))
        
        # Identify columns: exclude metadata, keep planetary/body columns
        meta_cols = {'date', 'hme_lon', 'area', 'ephem_idx_daily', 'Group', 'lat_lon', 'hg_lon', 'hgc_lon', 'lon', 'lat'}
        body_cols = [c for c in df.columns if c not in meta_cols and c.endswith('_lon')]
        
        print(f"  Analyzing {len(body_cols)} bodies against {len(df)} records...")
        
        # Prepare data arrays
        sun_lons = df['hme_lon'].values.astype(np.float32)
        sun_areas = df['area'].values.astype(np.float32)
        
        # Create task list
        # Note: Passing large arrays (sun_lons) in single-core mode is efficient 
        # because Python passes by reference (no memory copying/pickling involved).
        tasks = []
        for col in body_cols:
            body_data = df[col].values.astype(np.float32)
            tasks.append((col, body_data, sun_lons, sun_areas, THRESHOLDS, N_SIM_ALGO3))
            
        results_flat = []
        
        # [CORE LOGIC] Execute in Serial Mode
        # We loop directly in the main process. No overhead, no deadlocks.
        print("  [Mode] Executing in Serial Mode (No Multiprocessing)...")
        
        for i, t in enumerate(tasks):
            # Execute the worker function directly
            results_flat.extend(worker_algo3_final(t))
            
        print(f"  Processed {len(tasks)}/{len(tasks)} bodies. Done.")

        # Save results to CSV
        if results_flat:
            df_res = pd.DataFrame(results_flat)
            df_res['Stage'] = stage_name
            
            # Write header only if file does not exist
            hdr = not os.path.exists(OUTPUT_FILE_ALGO3)
            df_res.to_csv(OUTPUT_FILE_ALGO3, mode='a', header=hdr, index=False)
            print(f"  Saved {len(df_res)} rows to {OUTPUT_FILE_ALGO3}")
            
    print(f"Step 4 Completed. Time: {time.time() - total_start:.1f}s")

if __name__ == '__main__':
    step4_run_algo3_final()


Step 4: Running Algo 3 (Final) - FORCE SINGLE CORE MODE

Processing Stage: Dissipation ...
  Analyzing 781 bodies against 27870 records...
  [Mode] Executing in Serial Mode (No Multiprocessing)...
  Processed 781/781 bodies. Done.
  Saved 7810 rows to ../../results/04_conj_enh_opp_sup/sg/sg_algo3_single_body_781.csv

Processing Stage: Onset ...
  Analyzing 781 bodies against 33278 records...
  [Mode] Executing in Serial Mode (No Multiprocessing)...
  Processed 781/781 bodies. Done.
  Saved 7810 rows to ../../results/04_conj_enh_opp_sup/sg/sg_algo3_single_body_781.csv

Processing Stage: Duration ...
  Analyzing 781 bodies against 75678 records...
  [Mode] Executing in Serial Mode (No Multiprocessing)...
  Processed 781/781 bodies. Done.
  Saved 7810 rows to ../../results/04_conj_enh_opp_sup/sg/sg_algo3_single_body_781.csv

Processing Stage: All ...
  Analyzing 781 bodies against 256824 records...
  [Mode] Executing in Serial Mode (No Multiprocessing)...
  Processed 781/781 bodies. Done

## Step 5: physical trend analysis

In [6]:
import pandas as pd
import numpy as np
import os
from statsmodels.stats.multitest import multipletests

# =============================================================================
# Configuration Paths
# =============================================================================
BASE_DIR = '../../results/04_conj_enh_opp_sup/sg'
FILE_ALGO1 = os.path.join(BASE_DIR, 'sg_algo1_total_pairs.csv')
FILE_ALGO2 = os.path.join(BASE_DIR, 'sg_algo2_at_least_one.csv')
FILE_ALGO3 = os.path.join(BASE_DIR, 'sg_algo3_single_body_781.csv')

def analyze_physics_trend(csv_file, algo_name):
    """
    For Algo 1 & 2: 
    Do not use FDR (treated as N=1 sensitivity analysis), but check for consistency in physical trends.
    """
    if not os.path.exists(csv_file):
        print(f"File not found: {csv_file}")
        return

    print(f"\n{'='*80}")
    print(f"[{algo_name}] Physical Trend Verification Report (No FDR Penalty)")
    print(f"Core Logic: Windows are nested (N=1), focus on checking the 'Conjunction-Enhancement, Opposition-Suppression' pattern for Large/XLarge.")
    print(f"{'='*80}")
    
    df = pd.read_csv(csv_file)
    
    # 1. Physical Filtering: Focus only on Large and XLarge (exclude small sunspot noise)
    target_groups = ['Large 500-2000', 'XLarge >2000']
    # Check groups actually present in the data
    available_groups = [g for g in target_groups if g in df['Group'].unique()]
    
    if not available_groups:
        print("No Large/XLarge group data found.")
        return
        
    df_target = df[df['Group'].isin(available_groups)].copy()

    # 2. Construct Pivot Table: Display 1-5 degree windows horizontally to intuitively judge robustness
    # Target format: "Ratio% (P-value)"
    def format_cell(row):
        # Markers: ** p<0.01, * p<0.05
        p = row['p_val']
        mark = ""
        if p < 0.01: mark = "**"
        elif p < 0.05: mark = "*"
        
        # Physical direction check
        ratio = row['Ratio']
        # Conjunction expectation > 100, Opposition expectation < 100
        is_consistent = False
        if row['Type'] == 'Conjunction' and ratio > 100: is_consistent = True
        elif row['Type'] == 'Opposition' and ratio < 100: is_consistent = True
        
        # If significant but direction is reversed, mark as anomalous
        if mark and not is_consistent:
            return f"{ratio:.0f}%(Anomalous{mark})"
        
        return f"{ratio:.0f}% ({p:.3f}{mark})"

    df_target['Result'] = df_target.apply(format_cell, axis=1)
    
    # Create pivot table
    # Rows: Group, Type
    # Columns: Window
    try:
        pivot = df_target.pivot_table(
            index=['Group', 'Type'], 
            columns='Window', 
            values='Result', 
            aggfunc='first'
        )
        
        # Adjust column order
        cols = sorted(pivot.columns)
        pivot = pivot[cols]
        
        pd.set_option('display.max_rows', None)
        pd.set_option('display.width', 1000)
        print(pivot)
        
        print("\n[Interpretation Guide]")
        print("  1. Look for cells with * or ** (indicating Raw P < 0.05).")
        print("  2. Check horizontal trends: Is the Ratio on the same side of 100 from 1 to 5 degrees?")
        print("     - Conjunction: Expected Ratio > 100")
        print("     - Opposition: Expected Ratio < 100")
        print("  3. If the above two points are met, it can be judged as physically significant.")
        
    except Exception as e:
        print(f"Failed to generate pivot table: {e}")


def process_algo3_fair_fdr(csv_file):
    """
    For Algo 3 (Single Body Scan):
    FDR must be retained because 781 bodies were tested (N=781).
    """
    if not os.path.exists(csv_file): return

    print(f"\n{'='*80}")
    print(f"[Algo 3] Performing Fair FDR Correction (for 781 bodies)")
    print(f"{'='*80}")
    
    df = pd.read_csv(csv_file)
    
    # Must group by environment (Stage + Window + Type)
    # Compare 781 bodies under identical conditions
    group_cols = ['Stage', 'Window', 'Type']
    actual_cols = [c for c in group_cols if c in df.columns]
    
    df['p_adj_bh'] = np.nan
    df['sig_fdr'] = False
    
    total_sig = 0
    
    for name, group_data in df.groupby(actual_cols):
        # N = 781
        p_vals = group_data['p_val'].values
        idx = group_data.index
        
        # BH Correction
        reject, p_adj, _, _ = multipletests(p_vals, alpha=0.05, method='fdr_bh')
        
        df.loc[idx, 'p_adj_bh'] = p_adj
        df.loc[idx, 'sig_fdr'] = reject
        total_sig += reject.sum()
            
    # Save results
    df.to_csv(csv_file, index=False)
    print(f"Correction completed. A total of {total_sig} significant body records passing FDR were found across all group scans.")
    if total_sig > 0:
        print("Please open the CSV to view rows where 'sig_fdr' is True, focusing on results corresponding to Large/XLarge sunspots.")

if __name__ == '__main__':
    # 1. Run physical trend analysis for Algo 1 (Most important)
    analyze_physics_trend(FILE_ALGO1, "Algo 1: Total Pairs")
    
    # 2. Run physical trend analysis for Algo 2
    analyze_physics_trend(FILE_ALGO2, "Algo 2: At Least One")
    
    # 3. Run FDR correction for Algo 3 (Needle in a haystack mode)
    process_algo3_fair_fdr(FILE_ALGO3)
    
    print("\n>>> Analysis completed. Please write physical conclusions based on Algo 1 pivot table results. <<<")



[Algo 1: Total Pairs] Physical Trend Verification Report (No FDR Penalty)
Core Logic: Windows are nested (N=1), focus on checking the 'Conjunction-Enhancement, Opposition-Suppression' pattern for Large/XLarge.
Window                                 1             2              3             4              5
Group          Type                                                                               
Large 500-2000 Conjunction    0% (1.000)    0% (0.735)    68% (1.000)   51% (0.849)    82% (1.000)
               Opposition     0% (1.000)    0% (0.768)     0% (0.464)    0% (0.312)     0% (0.187)
XLarge >2000   Conjunction  121% (0.416)  132% (0.132)  138% (0.029*)  129% (0.099)  133% (0.035*)
               Opposition    71% (0.255)   84% (0.484)    80% (0.270)   76% (0.096)  69% (0.003**)

[Interpretation Guide]
  1. Look for cells with * or ** (indicating Raw P < 0.05).
  2. Check horizontal trends: Is the Ratio on the same side of 100 from 1 to 5 degrees?
     - Conjunction: Exp

## Step 6: Kuiper Test

In [7]:
# =============================================================================
# Step 6: Kuiper Test (Verify Morphology/Bimodal Distribution)
# =============================================================================
try:
    from astropy.stats import kuiper
except ImportError:
    print("Warning: astropy not installed. Skipping Kuiper test.")
    kuiper = None
    
def step5_run_kuiper_test_full():
    print("\n" + "=" * 60)
    print("Step 5: Kuiper Test (Full Coverage)")
    print("=" * 60)
    
    if kuiper is None: 
        print("Astropy missing, skipping.")
        return

    # Prepare results container
    results_kuiper = []
    
    # Get all stage files in the cache directory
    files = [f for f in os.listdir(CACHE_DIR) if f.startswith('ready_') and f.endswith('.parquet')]
    
    for f in files:
        stage_name = f.replace('ready_', '').replace('.parquet', '')
        print(f"Processing Stage: {stage_name} ...")
        
        # Read data
        df = pd.read_parquet(os.path.join(CACHE_DIR, f))
        
        # Determine grouping: besides Total, also look at specific Groups
        # For convenience, process Total first
        groups_dict = {'Total': df}
        
        # If Group column exists, split and process
        if 'Group' in df.columns:
            unique_groups = df['Group'].unique()
            for g in unique_groups:
                groups_dict[g] = df[df['Group'] == g]
        
        for group_name, subset_df in groups_dict.items():
            if len(subset_df) < 10: continue # Skip statistics if sample size is too small
            
            for planet in PLANET_COLS:
                # 1. Calculate phase angle (0-360)
                # Assume hme_lon is the longitude of Earth/observation point, planet is the planet's longitude
                # Phase = (Planet - Observation Point) % 360
                phases = (subset_df[planet] - subset_df['hme_lon']) % 360.0
                phases = phases.dropna().values
                
                if len(phases) == 0: continue

                # 2. Normalize to [0, 1]
                data = phases / 360.0
                
                # 3. Execute Kuiper test
                try:
                    V, p_val = kuiper(data)
                    
                    results_kuiper.append({
                        'Stage': stage_name,
                        'Group': group_name,
                        'Planet': planet,
                        'N': len(phases),
                        'V_statistic': round(V, 4),
                        'p_value': p_val, # Keep original precision
                        'Sig': '**' if p_val < 0.01 else ('*' if p_val < 0.05 else '')
                    })
                except Exception as e:
                    print(f"Error in Kuiper for {stage_name}-{group_name}-{planet}: {e}")

    # Save
    if results_kuiper:
        df_k = pd.DataFrame(results_kuiper)
        # Sort for easy viewing
        df_k.sort_values(by=['p_value'], inplace=True)
        df_k.to_csv(OUTPUT_FILE_KUIPER, index=False)
        print(f"Kuiper Test results saved to {OUTPUT_FILE_KUIPER}")
        print("Top 5 Significant Deviations:")
        print(df_k.head(5))
    else:
        print("No Kuiper results generated.")

if __name__ == '__main__':
    step5_run_kuiper_test_full()



Step 5: Kuiper Test (Full Coverage)
Processing Stage: Dissipation ...
Processing Stage: Onset ...
Processing Stage: Duration ...
Processing Stage: All ...
Processing Stage: Daily ...
Kuiper Test results saved to ../../results/04_conj_enh_opp_sup/sg/sg_algo_kuiper_test.csv
Top 5 Significant Deviations:
          Stage           Group   Planet      N  V_statistic  p_value Sig
2   Dissipation           Total  399_lon  27870       0.6090      0.0  **
18  Dissipation      Small <100  399_lon  27541       0.6096      0.0  **
42        Onset      Small <100  399_lon  31241       0.5825      0.0  **
34        Onset           Total  399_lon  33278       0.5823      0.0  **
50        Onset  Medium 100-500  399_lon   1984       0.6277      0.0  **
