# Periodicity Analysis

Search for periodic signals and transits in cleaned TESS data.

**Methods:**
- Lomb-Scargle periodogram — find periodic variables
- BLS (Box Least Squares) — find transit-like signals
- Phase folding — visualize periodic signals

**Data:** Sector 61, Camera 4, CCD 2 (cleaned with DataCleaner)

## 1. Setup & Data Loading

In [None]:
import sys
from pathlib import Path

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import lombscargle
from scipy.stats import median_abs_deviation

# Project setup
def find_project_root(start: Path) -> Path:
    for p in [start] + list(start.parents):
        if (p / "pyproject.toml").exists() or ((p / "src").exists() and (p / "data").exists()):
            return p
    return start

PROJECT_ROOT = find_project_root(Path.cwd())
DATA_DIR = PROJECT_ROOT / "data"
sys.path.insert(0, str(PROJECT_ROOT / "src"))

from tess.config import get_artifact_windows

# Plot settings
plt.rcParams["figure.figsize"] = (14, 5)
plt.rcParams["figure.dpi"] = 100

# Sector config
SECTOR = 61
CAMERA = "4"
CCD = "2"

print(f"PROJECT_ROOT: {PROJECT_ROOT}")
print(f"Analyzing: Sector {SECTOR}, Camera {CAMERA}, CCD {CCD}")

In [None]:
# Load CLEANED photometry (with common-mode correction and masks)
cleaned_path = DATA_DIR / f"tess/sector_{SECTOR:03d}/cam{CAMERA}_ccd{CCD}/cleaned/photometry_with_masks.parquet"

if cleaned_path.exists():
    df = pd.read_parquet(cleaned_path)
    print(f"Loaded cleaned data: {cleaned_path}")
    print(f"Shape: {df.shape}")
    print(f"Columns: {df.columns.tolist()}")
    USE_CLEANED = True
else:
    # Fallback to raw photometry
    raw_path = DATA_DIR / f"tess/sector_{SECTOR:03d}/cam{CAMERA}_ccd{CCD}/s{SECTOR:04d}_{CAMERA}-{CCD}_photometry.parquet"
    df = pd.read_parquet(raw_path)
    print(f"Loaded raw data: {raw_path}")
    print(f"Shape: {df.shape}")
    USE_CLEANED = False

print(f"\nUsing cleaned data: {USE_CLEANED}")
print(f"Stars: {df['star_id'].nunique()}")
print(f"Epochs: {df['epoch'].nunique() if 'epoch' in df.columns else 'N/A'}")

In [None]:
# Load ML features for star selection
ml_path = DATA_DIR / f"tess/sector_{SECTOR:03d}/cam{CAMERA}_ccd{CCD}/ml/ml_classification.parquet"

if ml_path.exists():
    ml_df = pd.read_parquet(ml_path)
    print(f"Loaded ML features: {ml_path}")
    print(f"Stars with features: {len(ml_df)}")
else:
    ml_df = None
    print("No ML features found")

In [None]:
# Mask bits explanation (if using cleaned data)
if USE_CLEANED and 'mask' in df.columns:
    print("Mask bits (uint16):")
    print("  Bit 0: quality    - TESS quality flag")
    print("  Bit 1: invalid    - NaN/negative flux")
    print("  Bit 2: bad_epoch  - High scatter epoch")
    print("  Bit 3: outlier_pos - Positive outlier (5σ)")
    print("  Bit 4: outlier_neg - Negative outlier (10σ)")
    print("  Bit 5: edge       - Star near CCD edge")
    print("  Bit 6: artifact   - Known artifact window")
    print("  Bit 7: low_snr    - Low signal-to-noise")
    
    # Count masked points
    print(f"\nTotal points: {len(df)}")
    print(f"Good points (mask=0): {(df['mask'] == 0).sum()} ({(df['mask'] == 0).mean()*100:.1f}%)")

## 2. Select Variable Candidates

In [None]:
# Select stars likely to be variable (high amplitude, high chi2)
if ml_df is not None:
    # Filter by variability metrics
    candidates = ml_df[
        (ml_df['amplitude_robust'] > 0.01) &  # >1% amplitude
        (ml_df['reduced_chi2'] > 1.0) &       # Significant variability
        (ml_df['snr'] > 10)                   # Good SNR
    ].copy()
    
    # Sort by amplitude
    candidates = candidates.sort_values('amplitude_robust', ascending=False)
    
    print(f"Variable candidates: {len(candidates)}")
    print(f"\nTop 10 by amplitude:")
    display(candidates[['star_id', 'tic_id', 'amplitude_robust', 'reduced_chi2', 'snr']].head(10))
else:
    # Use all stars if no ML features
    candidates = pd.DataFrame({'star_id': df['star_id'].unique()})
    print(f"Using all {len(candidates)} stars (no ML features for filtering)")

## 3. Lomb-Scargle Periodogram

In [None]:
def get_star_lightcurve(star_id, df, use_cleaned=True):
    """Extract clean lightcurve for a star."""
    star_df = df[df['star_id'] == star_id].copy()
    
    # Use flux_cm if available, else flux
    if use_cleaned and 'flux_cm' in star_df.columns:
        flux_col = 'flux_cm'
    else:
        flux_col = 'flux'
    
    # Apply mask if available
    if 'mask' in star_df.columns:
        star_df = star_df[star_df['mask'] == 0]
    elif 'quality' in star_df.columns:
        star_df = star_df[star_df['quality'] == 0]
    
    # Get time and flux
    time = star_df['btjd'].values
    flux = star_df[flux_col].values
    
    # Remove NaN
    mask = np.isfinite(time) & np.isfinite(flux)
    time, flux = time[mask], flux[mask]
    
    # Normalize
    if len(flux) > 0:
        median = np.median(flux)
        if median > 0:
            flux = flux / median
    
    return time, flux


def lomb_scargle_periodogram(time, flux, min_period=0.1, max_period=15.0, n_freqs=10000):
    """Compute Lomb-Scargle periodogram."""
    if len(time) < 50:
        return None, None, None, None
    
    # Shift time to start at 0 (numerical stability)
    t0 = time - time.min()
    
    # Center flux
    flux_centered = flux - np.mean(flux)
    
    # Frequency grid
    freqs = np.linspace(1/max_period, 1/min_period, n_freqs)
    angular_freqs = 2 * np.pi * freqs
    
    # Compute periodogram
    power = lombscargle(t0, flux_centered, angular_freqs, normalize=True)
    periods = 1 / freqs
    
    # Find best period
    best_idx = np.argmax(power)
    best_period = periods[best_idx]
    best_power = power[best_idx]
    
    return periods, power, best_period, best_power


print("Functions defined: get_star_lightcurve(), lomb_scargle_periodogram()")

In [None]:
# Run Lomb-Scargle on top candidates
N_ANALYZE = min(100, len(candidates))
print(f"Analyzing {N_ANALYZE} candidates...")

ls_results = []

for i, (_, row) in enumerate(candidates.head(N_ANALYZE).iterrows()):
    star_id = row['star_id']
    tic_id = row.get('tic_id', None)
    
    time, flux = get_star_lightcurve(star_id, df, USE_CLEANED)
    
    if len(time) < 50:
        continue
    
    periods, power, best_period, best_power = lomb_scargle_periodogram(time, flux)
    
    if best_period is not None:
        ls_results.append({
            'star_id': star_id,
            'tic_id': tic_id,
            'best_period': best_period,
            'ls_power': best_power,
            'n_points': len(time),
            'amplitude_robust': row.get('amplitude_robust', None)
        })
    
    if (i + 1) % 20 == 0:
        print(f"  Processed {i+1}/{N_ANALYZE}")

ls_df = pd.DataFrame(ls_results)
ls_df = ls_df.sort_values('ls_power', ascending=False)

print(f"\nCompleted. Found periods for {len(ls_df)} stars.")
print(f"\nTop 15 by Lomb-Scargle power:")
display(ls_df.head(15))

## 4. Phase Folding

In [None]:
def plot_star_analysis(star_id, tic_id, df, use_cleaned=True):
    """Plot raw lightcurve, periodogram, and phase-folded curve."""
    time, flux = get_star_lightcurve(star_id, df, use_cleaned)
    
    if len(time) < 50:
        print(f"Not enough data for {star_id}")
        return None
    
    # Compute periodogram
    periods, power, best_period, best_power = lomb_scargle_periodogram(time, flux)
    
    # Check double period
    period_2x = best_period * 2
    
    fig, axes = plt.subplots(2, 2, figsize=(14, 10))
    
    # 1. Raw lightcurve
    axes[0, 0].scatter(time, flux, s=2, alpha=0.5)
    axes[0, 0].axhline(1.0, color='gray', ls='--', alpha=0.5)
    axes[0, 0].set_xlabel('BTJD')
    axes[0, 0].set_ylabel('Normalized Flux')
    axes[0, 0].set_title(f'{star_id} (TIC {tic_id}) - Raw Lightcurve')
    
    # 2. Periodogram
    axes[0, 1].plot(periods, power, 'b-', lw=0.5)
    axes[0, 1].axvline(best_period, color='red', ls='--', label=f'Best: {best_period:.4f} d')
    axes[0, 1].set_xlabel('Period (days)')
    axes[0, 1].set_ylabel('Lomb-Scargle Power')
    axes[0, 1].set_title('Periodogram')
    axes[0, 1].legend()
    axes[0, 1].set_xlim(0, 15)
    
    # 3. Phase-folded (best period)
    phase = ((time - time.min()) / best_period) % 1.0
    phase = np.where(phase > 0.5, phase - 1.0, phase)
    
    # Bin
    n_bins = 50
    bin_edges = np.linspace(-0.5, 0.5, n_bins + 1)
    bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
    bin_means = [np.median(flux[(phase >= bin_edges[i]) & (phase < bin_edges[i+1])]) 
                 if ((phase >= bin_edges[i]) & (phase < bin_edges[i+1])).sum() > 0 else np.nan
                 for i in range(n_bins)]
    
    axes[1, 0].scatter(phase, flux, s=2, alpha=0.3)
    axes[1, 0].plot(bin_centers, bin_means, 'o-', color='orange', markersize=3)
    axes[1, 0].axhline(1.0, color='gray', ls='--', alpha=0.5)
    axes[1, 0].set_xlabel('Phase')
    axes[1, 0].set_ylabel('Normalized Flux')
    axes[1, 0].set_title(f'Phase-folded (P = {best_period:.4f} d)')
    
    # 4. Phase-folded (double period) - for eclipsing binaries
    phase_2x = ((time - time.min()) / period_2x) % 1.0
    phase_2x = np.where(phase_2x > 0.5, phase_2x - 1.0, phase_2x)
    
    bin_means_2x = [np.median(flux[(phase_2x >= bin_edges[i]) & (phase_2x < bin_edges[i+1])]) 
                    if ((phase_2x >= bin_edges[i]) & (phase_2x < bin_edges[i+1])).sum() > 0 else np.nan
                    for i in range(n_bins)]
    
    axes[1, 1].scatter(phase_2x, flux, s=2, alpha=0.3)
    axes[1, 1].plot(bin_centers, bin_means_2x, 'o-', color='orange', markersize=3)
    axes[1, 1].axhline(1.0, color='gray', ls='--', alpha=0.5)
    axes[1, 1].set_xlabel('Phase')
    axes[1, 1].set_ylabel('Normalized Flux')
    axes[1, 1].set_title(f'Phase-folded (P = {period_2x:.4f} d, x2)')
    
    plt.tight_layout()
    return fig, best_period


print("Function defined: plot_star_analysis()")

In [None]:
# Plot top 6 periodic candidates
N_PLOT = 6

for i, (_, row) in enumerate(ls_df.head(N_PLOT).iterrows()):
    fig, period = plot_star_analysis(row['star_id'], row['tic_id'], df, USE_CLEANED)
    plt.show()
    print(f"Star {i+1}: {row['star_id']} | TIC {row['tic_id']} | P = {period:.4f} d | LS Power = {row['ls_power']:.4f}")
    print()

## 5. BLS Transit Search

In [None]:
def bls_search(time, flux, periods, duration_frac=0.02):
    """Simple BLS-like transit search.
    
    For production use, prefer astropy.timeseries.BoxLeastSquares.
    """
    if len(time) < 50:
        return None, None, None
    
    t0 = time - time.min()
    
    best_power = 0
    best_period = None
    best_depth = 0
    powers = []
    
    for period in periods:
        phase = (t0 / period) % 1.0
        
        # Try different transit phases
        n_phase_bins = 20
        period_best_power = 0
        
        for phase_start in np.linspace(0, 1 - duration_frac, n_phase_bins):
            in_transit = (phase >= phase_start) & (phase < phase_start + duration_frac)
            out_transit = ~in_transit
            
            if in_transit.sum() < 3 or out_transit.sum() < 10:
                continue
            
            f_in = np.median(flux[in_transit])
            f_out = np.median(flux[out_transit])
            depth = f_out - f_in
            
            if depth > 0:
                n_in, n_out = in_transit.sum(), out_transit.sum()
                power = depth * np.sqrt(n_in * n_out / (n_in + n_out))
                
                period_best_power = max(period_best_power, power)
                
                if power > best_power:
                    best_power = power
                    best_period = period
                    best_depth = depth
        
        powers.append(period_best_power)
    
    return best_period, best_power, best_depth


print("Function defined: bls_search()")

In [None]:
# Search for transits in low-amplitude stars (potential planet hosts)
if ml_df is not None:
    transit_candidates = ml_df[
        (ml_df['amplitude_robust'] < 0.03) &  # Low amplitude (not eclipsing binary)
        (ml_df['amplitude_robust'] > 0.001) & # Some variation
        (ml_df['snr'] > 20)                   # Good SNR
    ].copy()
    transit_candidates = transit_candidates.sort_values('snr', ascending=False)
else:
    transit_candidates = candidates

print(f"Transit candidates: {len(transit_candidates)}")

# Run BLS
N_BLS = min(50, len(transit_candidates))
bls_periods = np.linspace(0.5, 10.0, 200)

print(f"\nRunning BLS on {N_BLS} stars...")

bls_results = []

for i, (_, row) in enumerate(transit_candidates.head(N_BLS).iterrows()):
    star_id = row['star_id']
    time, flux = get_star_lightcurve(star_id, df, USE_CLEANED)
    
    if len(time) < 100:
        continue
    
    period, power, depth = bls_search(time, flux, bls_periods)
    
    if period is not None and depth > 0.005:  # >0.5% depth
        bls_results.append({
            'star_id': star_id,
            'tic_id': row.get('tic_id', None),
            'period': period,
            'depth_pct': depth * 100,
            'bls_power': power,
            'snr': row.get('snr', None)
        })
    
    if (i + 1) % 10 == 0:
        print(f"  Processed {i+1}/{N_BLS}")

bls_df = pd.DataFrame(bls_results)
if len(bls_df) > 0:
    bls_df = bls_df.sort_values('bls_power', ascending=False)
    print(f"\nFound {len(bls_df)} potential transit signals:")
    display(bls_df.head(10))
else:
    print("\nNo significant transit signals found.")

## 6. Export Results

In [None]:
# Export periodicity results
output_dir = PROJECT_ROOT / f"variable_stars/sector_{SECTOR:03d}"
output_dir.mkdir(parents=True, exist_ok=True)

# Save Lomb-Scargle results
ls_path = output_dir / 'periodicity_lomb_scargle.csv'
ls_df.to_csv(ls_path, index=False)
print(f"Lomb-Scargle results saved to: {ls_path}")

# Save BLS results
if len(bls_df) > 0:
    bls_path = output_dir / 'periodicity_bls_transits.csv'
    bls_df.to_csv(bls_path, index=False)
    print(f"BLS results saved to: {bls_path}")

In [None]:
# Save plots for top candidates
plots_dir = output_dir / 'periodicity_plots'
plots_dir.mkdir(exist_ok=True)

N_SAVE = 10
print(f"Saving plots for top {N_SAVE} periodic candidates...")

for i, (_, row) in enumerate(ls_df.head(N_SAVE).iterrows()):
    fig, period = plot_star_analysis(row['star_id'], row['tic_id'], df, USE_CLEANED)
    
    filename = f"{row['star_id']}_TIC{row['tic_id']}_P{period:.4f}d.png"
    fig.savefig(plots_dir / filename, dpi=100, bbox_inches='tight')
    plt.close(fig)

print(f"Plots saved to: {plots_dir}")

## Summary

This notebook analyzed cleaned TESS data for periodic signals:

1. **Lomb-Scargle** — found periodic variables (eclipsing binaries, pulsators, rotators)
2. **BLS** — searched for transit-like signals (potential exoplanets)
3. **Phase folding** — visualized periodic signals with single and double periods

### Files Generated

- `variable_stars/sector_061/periodicity_lomb_scargle.csv` — Periods and LS power
- `variable_stars/sector_061/periodicity_bls_transits.csv` — Transit candidates
- `variable_stars/sector_061/periodicity_plots/` — Phase-folded plots

### Next Steps

1. Cross-match periodic stars with VSX
2. Verify transit candidates on ExoFOP
3. Submit new discoveries to VSX