### GALAXY PAIRS CATALOG

In [63]:
import numpy as np
import pandas as pd
from astropy.io import fits
from astropy.cosmology import Planck18 as cosmo
from tqdm.notebook import tqdm
from helper import preprocess_catalog_galactic
from helper import load_catalog
from multiprocessing import Pool, cpu_count
from functools import partial

In [64]:
# --- Settings ---
dataset = "BOSS"
if dataset == "eBOSS":
    catalog, region = "LRG", "NGC"
    real_file = f"data/eBOSS/eBOSS_{catalog}_clustering_data-{region}-vDR16.fits"
    rand_file = f"data/eBOSS/eBOSS_LRG_clustering_random-{region}-vDR16.fits"
elif dataset == "BOSS":
    catalog, region = "CMASS", "North"
    real_file = f"data/BOSS/galaxy_DR12v5_CMASS_{region}.fits"
    rand_file = f"data/BOSS/random0_DR12v5_CMASS_{region}.fits"
else:
    raise ValueError("dataset must be eBOSS or BOSS")

alm_file = "data/COM_Lensing_4096_R3.00/MV/dat_klm.fits"
mask_file = "data/COM_Lensing_4096_R3.00/mask.fits"

In [65]:
def compute_angle_cosine(l1, b1, l2, b2):
    """
    Compute cosine of angle θ between two directions (in degrees) in Galactic coordinates.
    """
    l1_rad, b1_rad = np.radians(l1), np.radians(b1)
    l2_rad, b2_rad = np.radians(l2), np.radians(b2)

    cos_theta = (
        np.cos(b1_rad) * np.cos(l1_rad) * np.cos(b2_rad) * np.cos(l2_rad) +
        np.cos(b1_rad) * np.sin(l1_rad) * np.cos(b2_rad) * np.sin(l2_rad) +
        np.sin(b1_rad) * np.sin(b2_rad)
    )
    return cos_theta

def galactic_to_cartesian(l, b, D):
    """
    Convert Galactic coordinates (l, b in degrees, D in Mpc/h) to 3D Cartesian.
    """
    l_rad, b_rad = np.radians(l), np.radians(b)
    x = D * np.cos(b_rad) * np.cos(l_rad)
    y = D * np.cos(b_rad) * np.sin(l_rad)
    z = D * np.sin(b_rad)
    return np.column_stack([x, y, z])

def create_distance_bins(D, r_par_max):
    """
    Create bins for parallel distance (comoving distance) to limit search space.
    """
    # Sort galaxies by distance and create bins
    sorted_indices = np.argsort(D)
    sorted_D = D[sorted_indices]
    
    # Create bins with width = r_par_max to limit parallel distance searches
    bins = {}
    current_bin = 0
    bin_edges = [0]
    
    for i, d in enumerate(sorted_D):
        if i == 0 or d - sorted_D[bin_edges[-1]] > r_par_max * 2:
            bin_edges.append(i)
            current_bin += 1
    
    bin_edges.append(len(D))
    
    # Store which galaxies are in searchable range for each bin
    for i in range(len(bin_edges) - 1):
        bin_start = bin_edges[i]
        bin_end = bin_edges[i + 1]
        bins[i] = sorted_indices[bin_start:bin_end]
    
    return bins, sorted_indices, sorted_D

def process_chunk_optimized(chunk_start_end, l, b, D, data_filtered, weights_valid, z, 
                            r_par_max, r_perp_min, r_perp_max, sorted_indices, sorted_D):
    """
    Process a chunk of galaxies using VECTORIZED distance-sorted approach.
    This is MUCH faster than the loop version.
    """
    import time
    chunk_start, chunk_end = chunk_start_end
    
    # Pre-allocate lists for vectorized operations
    i_list, j_list = [], []
    Dc1_list, Dc2_list = [], []
    
    start_time = time.time()
    last_print_time = start_time
    
    # First pass: collect candidate pairs using fast filtering
    for idx in range(chunk_start, chunk_end):
        i = sorted_indices[idx]
        Dc1 = D[i]
        
        # Debug: print progress every 1000 galaxies
        if (idx - chunk_start) > 0 and (idx - chunk_start) % 1000 == 0:
            current_time = time.time()
            elapsed = current_time - start_time
            interval_time = current_time - last_print_time
            galaxies_processed = idx - chunk_start
            rate = 1000 / interval_time if interval_time > 0 else 0
            pairs_found = len(i_list)
            print(f"  Chunk [{chunk_start}-{chunk_end}]: Processed {galaxies_processed}/{chunk_end-chunk_start} galaxies "
                  f"({rate:.1f} gal/s, {interval_time:.1f}s for last 1000, {pairs_found} pairs so far)")
            last_print_time = current_time
        
        # Binary search for galaxies within r_par_max
        left_bound = np.searchsorted(sorted_D, Dc1 - r_par_max, side='left')
        right_bound = np.searchsorted(sorted_D, Dc1 + r_par_max, side='right')
        search_start = max(idx + 1, left_bound)
        search_end = right_bound
        
        if search_end <= search_start:
            continue
        
        # Get all candidate j indices
        j_candidates = sorted_indices[search_start:search_end]
        Dc2_candidates = D[j_candidates]
        
        # Vectorized angular filtering
        l1, b1 = l[i], b[i]
        l2_candidates = l[j_candidates]
        b2_candidates = b[j_candidates]
        
        # Fast approximate angular separation
        dl = np.abs(l2_candidates - l1)
        db = np.abs(b2_candidates - b1)
        
        # Handle wrapping around 360 degrees for longitude
        dl = np.minimum(dl, 360 - dl)
        
        D_avg = (Dc1 + Dc2_candidates) / 2.0
        min_angle = r_perp_min / D_avg
        max_angle = r_perp_max / D_avg

        # apply cosb correction for proper angular separation approximation
        # without this, rough_angle overestimates true angle at hih latitidues, potentiallu causing valid pairs to be missed
        cosb1 = np.cos(np.radians(b1))
        rough_angle = np.sqrt((np.radians(dl*cosb1))**2 + (np.radians(db))**2)
        
        # Filter by approximate angle (with generous margins)
        angle_mask = (rough_angle >= min_angle * 0.7) & (rough_angle <= max_angle * 2)
        
        if not np.any(angle_mask):
            continue
        
        # Keep only candidates that pass angular filter
        j_filtered = j_candidates[angle_mask]
        
        # Store for vectorized processing
        i_list.extend([i] * len(j_filtered))
        j_list.extend(j_filtered)
    
    if len(i_list) == 0:
        return []
    
    # Convert to arrays for vectorized computation
    i_arr = np.array(i_list)
    j_arr = np.array(j_list)
    
    # Vectorized precise angle calculation
    l1_arr, b1_arr = l[i_arr], b[i_arr]
    l2_arr, b2_arr = l[j_arr], b[j_arr]
    
    l1_rad, b1_rad = np.radians(l1_arr), np.radians(b1_arr)
    l2_rad, b2_rad = np.radians(l2_arr), np.radians(b2_arr)
    
    cos_theta = (
        np.cos(b1_rad) * np.cos(l1_rad) * np.cos(b2_rad) * np.cos(l2_rad) +
        np.cos(b1_rad) * np.sin(l1_rad) * np.cos(b2_rad) * np.sin(l2_rad) +
        np.sin(b1_rad) * np.sin(b2_rad)
    )
    
    theta = np.arccos(np.clip(cos_theta, -1, 1))
    
    Dc1_arr = D[i_arr]
    Dc2_arr = D[j_arr]
    D_avg_arr = (Dc1_arr + Dc2_arr) / 2.0
    r_perp_arr = D_avg_arr * theta
    
    # Final filter
    final_mask = (r_perp_arr >= r_perp_min) & (r_perp_arr <= r_perp_max)
    
    # Build pairs for accepted candidates
    i_final = i_arr[final_mask]
    j_final = j_arr[final_mask]
    
    pairs = []
    for idx in range(len(i_final)):
        i = i_final[idx]
        j = j_final[idx]
        Dmid = cosmo.comoving_distance((z[i] + z[j]) / 2).value * cosmo.h
        pairs.append({
            'l1': l[i], 'b1': b[i], 'z1': z[i], 'w1': weights_valid[i], 'Dc1': D[i], 'ID1': data_filtered[i]['ID'],
            'l2': l[j], 'b2': b[j], 'z2': z[j], 'w2': weights_valid[j], 'Dc2': D[j], 'ID2': data_filtered[j]['ID'],
            'Dmid': Dmid
        })
    
    total_time = time.time() - start_time
    print(f"  Chunk [{chunk_start}-{chunk_end}] COMPLETE: {chunk_end-chunk_start} galaxies in {total_time:.1f}s, found {len(pairs)} pairs")
    
    return pairs

def build_galaxy_pair_catalog(data, weights, r_par_max=20, r_perp_min=18, r_perp_max=22,
                               n_processes=None, chunk_size=10000, save_intermediate=True):
    """
    Build galaxy pair catalog based on parallel and perpendicular distance criteria.
    Uses distance sorting and VECTORIZED operations for maximum speed.
    
    Parameters:
    -----------
    data : array
        Galaxy catalog data
    weights : array
        Galaxy weights
    r_par_max : float
        Maximum parallel distance (Mpc/h)
    r_perp_min : float
        Minimum perpendicular distance (Mpc/h)
    r_perp_max : float
        Maximum perpendicular distance (Mpc/h)
    n_processes : int or None
        Number of processes to use (default: cpu_count() - 1)
    chunk_size : int
        Number of galaxies to process per chunk
    save_intermediate : bool
        Save results every 10 chunks to avoid losing progress
    
    Returns:
    --------
    pairs : DataFrame
        Galaxy pair catalog
    """
    l, b, D, data_filtered, weights_valid = preprocess_catalog_galactic(data, weights)
    z = data_filtered['Z']
    
    n_galaxies = len(data_filtered)
    print(f"Building pair catalog for {n_galaxies} galaxies...")
    print(f"Search parameters: r_par_max={r_par_max}, r_perp_min={r_perp_min}, r_perp_max={r_perp_max}")
    
    # Sort galaxies by distance for efficient searching
    print("Sorting galaxies by comoving distance...")
    sorted_indices = np.argsort(D)
    sorted_D = D[sorted_indices]
    
    # Estimate search space
    avg_D = np.median(D)
    expected_pairs_per_galaxy = int((2 * r_par_max / avg_D) * n_galaxies * 0.01)
    print(f"Average distance: {avg_D:.1f} Mpc/h")
    print(f"Estimated search space per galaxy: ~{expected_pairs_per_galaxy} candidates")
    
    # Split into chunks for parallel processing
    n_chunks = int(np.ceil(n_galaxies / chunk_size))
    chunk_ranges = [(i * chunk_size, min((i + 1) * chunk_size, n_galaxies)) 
                    for i in range(n_chunks)]
    
    if n_processes is None:
        # Use 75% of available cores (better for Windows and memory usage)
        n_processes = max(1, int(cpu_count() * 0.75))
    
    # Cap at reasonable maximum (too many processes = overhead + memory issues)
    n_processes = min(n_processes, 12)
    
    print(f"Processing {n_chunks} chunks using {n_processes} processes...")
    print(f"NOTE: With r_perp_min={r_perp_min}, you may find MANY pairs. Consider r_perp_min > 5-10.")
    
    # Create partial function with fixed parameters
    process_func = partial(
        process_chunk_optimized,
        l=l, b=b, D=D, 
        data_filtered=data_filtered, weights_valid=weights_valid, z=z,
        r_par_max=r_par_max, r_perp_min=r_perp_min, r_perp_max=r_perp_max,
        sorted_indices=sorted_indices, sorted_D=sorted_D
    )
    
    # Process in parallel with intermediate saves
    all_pairs = []
    output_file = f"data/paircatalogs/galaxy_pairs_catalog_CMASS_{region}_{r_par_max}_{r_perp_min}_{r_perp_max}hmpc.csv"
    
    if n_processes > 1:
        import time
        print(f"\n{'='*60}")
        print(f"NOTE: Debug prints from worker processes won't appear here.")
        print(f"Progress tracked by main process after each chunk completes.")
        print(f"{'='*60}\n")
        
        chunk_start_time = time.time()
        with Pool(processes=n_processes) as pool:
            for i, result in enumerate(tqdm(
                pool.imap(process_func, chunk_ranges),
                total=len(chunk_ranges),
                desc="Finding pairs"
            )):
                chunk_time = time.time() - chunk_start_time
                all_pairs.extend(result)
                
                # Print summary after each chunk completes
                chunk_start, chunk_end = chunk_ranges[i]
                n_pairs_in_chunk = len(result)
                total_pairs_so_far = sum(len(p) if isinstance(p, list) else 1 for p in all_pairs)
                print(f"\n  Chunk {i+1}/{len(chunk_ranges)} [{chunk_start}-{chunk_end}]: "
                      f"Found {n_pairs_in_chunk:,} pairs in {chunk_time:.1f}s "
                      f"({(chunk_end-chunk_start)/chunk_time:.1f} gal/s) | "
                      f"Total: {total_pairs_so_far:,} pairs")
                
                # Save intermediate results every 10 chunks
                if save_intermediate and (i + 1) % 10 == 0:
                    temp_df = pd.DataFrame([p for chunk in all_pairs for p in [chunk]] if isinstance(all_pairs[0], dict) else all_pairs)
                    temp_file = output_file.replace('.csv', f'_temp_{i+1}.csv')
                    temp_df.to_csv(temp_file, index=False)
                    print(f"  ✓ Checkpoint: Saved {len(temp_df):,} pairs to {temp_file}")
                
                chunk_start_time = time.time()
    else:
        # Single process mode - debug prints WILL appear
        print(f"\n{'='*60}")
        print(f"Running in SINGLE PROCESS mode - all debug prints visible.")
        print(f"{'='*60}\n")
        
        for i, chunk in enumerate(tqdm(chunk_ranges, desc="Finding pairs")):
            result = process_func(chunk)
            all_pairs.extend(result)
            
            if save_intermediate and (i + 1) % 10 == 0:
                temp_df = pd.DataFrame(all_pairs)
                temp_file = output_file.replace('.csv', f'_temp_{i+1}.csv')
                temp_df.to_csv(temp_file, index=False)
                print(f"\n  Checkpoint: Saved {len(temp_df)} pairs to {temp_file}")
    
    # Flatten and convert to DataFrame
    pairs = [pair for chunk_pairs in all_pairs for pair in chunk_pairs] if all_pairs and isinstance(all_pairs[0], list) else all_pairs
    
    print(f"\nFound {len(pairs)} total pairs")
    
    pairs_df = pd.DataFrame(pairs)
    pairs_df.to_csv(output_file, index=False)
    print(f"Saved to {output_file}")
    
    return pairs_df

In [66]:
# --- Run all ---
if dataset == "BOSS":
    z_min = .4
    z_max = .7
    weight = "CMASS"
else: # eBOSS
    z_min = 0
    z_max = 10000
    weight = True

data_real, w_real = load_catalog(real_file, weights=weight, z_min=z_min, z_max=z_max)
# data_rand, w_rand = load_catalog(rand_file, random_fraction=0.10)

# Build galaxy pair catalog with VECTORIZED version
# This eliminates inner Python loops for massive speedup
pair_catalog = build_galaxy_pair_catalog(
    data_real, w_real, 
    r_par_max=20,
    r_perp_min=18,          # NOTE: r_perp_min=0 will find MANY pairs! Consider 5-10 for faster results
    r_perp_max=22,
    n_processes=1,      # Auto: uses 75% of cores (capped at 12). On 16-core: will use 12
    chunk_size=10000,      # Even larger chunks with vectorization
    save_intermediate=True # Saves progress every 10 chunks
)
print(f"Total valid pairs: {len(pair_catalog)}")

# Jackknife real galaxies
# kappa_real, sigma_real = jackknife_stack_healpix(data_real, w_real, data_rand, nside=10)
# sn_real = np.zeros_like(kappa_real)
# valid = sigma_real > 0
# sn_real[valid] = kappa_real[valid] / sigma_real[valid]
# kappa_rand, sigma_rand, sn_rand = stack_kappa(data_rand, w_rand, "Random")
# kappa_sub = kappa_real - kappa_rand
# kappa_smooth = gaussian_filter(kappa_sub, sigma=2)

Building pair catalog for 579089 galaxies...
Search parameters: r_par_max=20, r_perp_min=18, r_perp_max=22
Sorting galaxies by comoving distance...
Average distance: 1405.2 Mpc/h
Estimated search space per galaxy: ~164 candidates
Processing 58 chunks using 1 processes...
NOTE: With r_perp_min=18, you may find MANY pairs. Consider r_perp_min > 5-10.

Running in SINGLE PROCESS mode - all debug prints visible.



Finding pairs:   0%|          | 0/58 [00:00<?, ?it/s]

  Chunk [0-10000]: Processed 1000/10000 galaxies (12180.4 gal/s, 0.1s for last 1000, 4201 pairs so far)
  Chunk [0-10000]: Processed 2000/10000 galaxies (13153.6 gal/s, 0.1s for last 1000, 9490 pairs so far)
  Chunk [0-10000]: Processed 3000/10000 galaxies (8448.4 gal/s, 0.1s for last 1000, 15388 pairs so far)
  Chunk [0-10000]: Processed 4000/10000 galaxies (18246.5 gal/s, 0.1s for last 1000, 22339 pairs so far)
  Chunk [0-10000]: Processed 5000/10000 galaxies (15525.1 gal/s, 0.1s for last 1000, 30406 pairs so far)
  Chunk [0-10000]: Processed 6000/10000 galaxies (13501.3 gal/s, 0.1s for last 1000, 39564 pairs so far)
  Chunk [0-10000]: Processed 7000/10000 galaxies (14566.0 gal/s, 0.1s for last 1000, 49739 pairs so far)
  Chunk [0-10000]: Processed 8000/10000 galaxies (12135.0 gal/s, 0.1s for last 1000, 60873 pairs so far)
  Chunk [0-10000]: Processed 9000/10000 galaxies (9305.4 gal/s, 0.1s for last 1000, 72578 pairs so far)
  Chunk [0-10000] COMPLETE: 10000 galaxies in 1.2s, found 8