In [1]:
pip install -q astroquery

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m11.1/11.1 MB[0m [31m25.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m11.7 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
# ============================================================================#
# RADIO GALAXY SIMBAD QUERY SCRIPT                                  #
# ============================================================================#
# Features:
# - Concurrent SIMBAD queries with progress tracking
# - Configuration-driven parameters
# - Robust error handling with retry logic
# - Checkpoint system for crash recovery
# - Proper logging framework
# - Type hints and comprehensive documentation
# ============================================================================#

import pandas as pd
import logging
import os
import requests
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import asyncio
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
import time
import warnings

warnings.filterwarnings('ignore')

from astroquery.simbad import Simbad
from astropy.coordinates import SkyCoord
from astropy import units as u
from tqdm import tqdm


# ============================================================================#
# CONFIGURATION                                                              #
# ============================================================================#

CONFIG = {
    # File paths
    'drive_url': "https://drive.google.com/file/d/1owgNHA6x1LMgB7uCvueUQYoYxseXJfzD/view?usp=sharing",
    'local_filename': "rgz_data.csv",
    'output_filename': "simbad_query_results.csv",
    'checkpoint_dir': "checkpoints",
    'log_filename': "simbad_query.log",

    # Query parameters
    'search_radius_deg': 0.1,
    'max_objects': 100,
    'batch_size': 10,
    'checkpoint_interval': 20,

    # Network parameters
    'timeout_seconds': 500,
    'rate_limit_delay': 1.0,
    'max_retries': 3,
    'retry_backoff': 2.0,  # exponential backoff multiplier
    'max_workers': 5,  # concurrent workers

    # Validation
    'min_ra': 0,
    'max_ra': 360,
    'min_dec': -90,
    'max_dec': 90,
}

# Column name mapping for standardization
COLUMN_MAPPING = {
    'rgzid': 'RGZ_ID',
    'rgz_id': 'RGZ_ID',
    'id': 'RGZ_ID',
    'ra': 'RA',
    'ra_deg': 'RA',
    'dec': 'Dec',
    'dec_deg': 'Dec',
    'declination': 'Dec',
}


# ============================================================================#
# LOGGING SETUP                                                              #
# ============================================================================#

def setup_logging(log_file: str) -> logging.Logger:
    """
    Configure logging to both console and file.

    Args:
        log_file: Path to log file

    Returns:
        Configured logger instance
    """
    logger = logging.getLogger('SIMBAD_Query')
    logger.setLevel(logging.DEBUG)

    # Console handler
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)

    # File handler
    file_handler = logging.FileHandler(log_file)
    file_handler.setLevel(logging.DEBUG)

    # Formatter
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    console_handler.setFormatter(formatter)
    file_handler.setFormatter(formatter)

    logger.addHandler(console_handler)
    logger.addHandler(file_handler)

    return logger


# ============================================================================#
# DATA LOADING AND CLEANING                                                  #
# ============================================================================#

def extract_file_id(drive_url: str) -> str:
    """
    Extract Google Drive file ID from sharing URL.

    Args:
        drive_url: Google Drive sharing URL

    Returns:
        File ID string

    Raises:
        ValueError: If file ID cannot be extracted
    """
    try:
        file_id = drive_url.split("/d/")[1].split("/")[0]
        if not file_id:
            raise ValueError("Empty file ID extracted")
        return file_id
    except (IndexError, ValueError) as e:
        raise ValueError(f"Cannot extract file ID from URL: {e}")


def download_data(drive_url: str, local_filename: str, logger: logging.Logger) -> bool:
    """
    Download CSV file from Google Drive with validation.

    Args:
        drive_url: Google Drive sharing URL
        local_filename: Local path to save file
        logger: Logger instance

    Returns:
        True if download successful, False otherwise
    """
    if os.path.exists(local_filename):
        logger.info(f"✓ Using cached file: {local_filename}")
        return True

    try:
        logger.info("Downloading CSV from Google Drive...")
        file_id = extract_file_id(drive_url)
        download_url = f"https://drive.google.com/uc?id={file_id}"

        response = requests.get(download_url, timeout=30)
        response.raise_for_status()

        with open(local_filename, "wb") as f:
            f.write(response.content)

        logger.info(f"✓ Downloaded file saved as {local_filename}")
        logger.debug(f"File size: {os.path.getsize(local_filename)} bytes")
        return True

    except requests.RequestException as e:
        logger.error(f"Failed to download file: {e}")
        return False


def standardize_columns(df: pd.DataFrame, logger: logging.Logger) -> pd.DataFrame:
    """
    Standardize column names to expected format.

    Args:
        df: Input DataFrame
        logger: Logger instance

    Returns:
        DataFrame with standardized column names
    """
    df.columns = df.columns.str.strip().str.lower()

    rename_dict = {}
    for old_col in df.columns:
        if old_col in COLUMN_MAPPING:
            rename_dict[old_col] = COLUMN_MAPPING[old_col]

    if rename_dict:
        df.rename(columns=rename_dict, inplace=True)
        logger.debug(f"Renamed columns: {rename_dict}")

    return df


def load_and_clean_data(
    local_filename: str,
    config: Dict,
    logger: logging.Logger
) -> pd.DataFrame:
    """
    Load and clean astronomical data from CSV.

    Args:
        local_filename: Path to CSV file
        config: Configuration dictionary
        logger: Logger instance

    Returns:
        Cleaned DataFrame

    Raises:
        FileNotFoundError: If file doesn't exist
        ValueError: If required columns are missing or data is invalid
    """
    if not os.path.exists(local_filename):
        raise FileNotFoundError(f"File not found: {local_filename}")

    logger.info(f"Loading data from {local_filename}...")
    df = pd.read_csv(local_filename)
    logger.info(f"✓ Loaded {len(df):,} rows")
    logger.debug(f"Columns: {list(df.columns)}")

    # Standardize column names
    df = standardize_columns(df, logger)

    # Verify required columns
    required_cols = ['RGZ_ID', 'RA', 'Dec']
    missing_cols = [col for col in required_cols if col not in df.columns]
    if missing_cols:
        raise ValueError(f"Missing required columns: {missing_cols}")

    logger.debug(f"Before cleaning: {len(df)} rows")

    # Clean data
    df = df.dropna(subset=required_cols)
    logger.debug(f"After removing NaN: {len(df)} rows")

    # Validate coordinate ranges
    initial_len = len(df)
    df = df[
        (df['RA'] >= config['min_ra']) &
        (df['RA'] <= config['max_ra']) &
        (df['Dec'] >= config['min_dec']) &
        (df['Dec'] <= config['max_dec'])
    ].reset_index(drop=True)

    removed = initial_len - len(df)
    if removed > 0:
        logger.warning(f"Removed {removed} rows with invalid coordinates")

    # Check for duplicates
    duplicates = df['RGZ_ID'].duplicated().sum()
    if duplicates > 0:
        logger.warning(f"Found {duplicates} duplicate RGZ_IDs")

    logger.info(f"✓ {len(df):,} valid rows after cleaning")
    return df


# ============================================================================#
# CHECKPOINT SYSTEM                                                          #
# ============================================================================#

def load_checkpoint(
    checkpoint_dir: str,
    logger: logging.Logger
) -> Tuple[List[Dict], int]:
    """
    Load previous progress from checkpoint.

    Args:
        checkpoint_dir: Directory containing checkpoint files
        logger: Logger instance

    Returns:
        Tuple of (results list, processed count)
    """
    checkpoint_file = os.path.join(checkpoint_dir, "checkpoint.json")
    results_file = os.path.join(checkpoint_dir, "partial_results.csv")

    if os.path.exists(checkpoint_file):
        try:
            with open(checkpoint_file, 'r') as f:
                checkpoint = json.load(f)

            processed_count = checkpoint.get('processed_count', 0)
            logger.info(f"✓ Loaded checkpoint: {processed_count} objects already processed")

            if os.path.exists(results_file):
                results_df = pd.read_csv(results_file)
                results = results_df.to_dict('records')
                logger.info(f"✓ Loaded {len(results)} partial results")
                return results, processed_count
        except Exception as e:
            logger.warning(f"Failed to load checkpoint: {e}")

    return [], 0


def save_checkpoint(
    checkpoint_dir: str,
    processed_count: int,
    results: List[Dict],
    logger: logging.Logger
) -> None:
    """
    Save progress to checkpoint.

    Args:
        checkpoint_dir: Directory to store checkpoint
        processed_count: Number of objects processed
        results: List of results collected so far
        logger: Logger instance
    """
    os.makedirs(checkpoint_dir, exist_ok=True)

    checkpoint_file = os.path.join(checkpoint_dir, "checkpoint.json")
    results_file = os.path.join(checkpoint_dir, "partial_results.csv")

    try:
        # Save checkpoint metadata
        checkpoint = {
            'timestamp': datetime.now().isoformat(),
            'processed_count': processed_count
        }
        with open(checkpoint_file, 'w') as f:
            json.dump(checkpoint, f, indent=2)

        # Save partial results
        if results:
            results_df = pd.DataFrame(results)
            results_df.to_csv(results_file, index=False)

        logger.debug(f"✓ Checkpoint saved: {processed_count} processed")
    except Exception as e:
        logger.error(f"Failed to save checkpoint: {e}")


# ============================================================================#
# SIMBAD QUERYING                                                            #
# ============================================================================#

def query_simbad_single(
    row: pd.Series,
    config: Dict,
    logger: logging.Logger,
    retry_count: int = 0
) -> Optional[List[Dict]]:
    """
    Query SIMBAD for a single object with retry logic.

    Args:
        row: DataFrame row containing RGZ_ID, RA, Dec
        config: Configuration dictionary
        logger: Logger instance
        retry_count: Current retry attempt

    Returns:
        List of result dictionaries or None if failed
    """
    rgz_id = row['RGZ_ID']
    ra = row['RA']
    dec = row['Dec']

    try:
        coord = SkyCoord(ra, dec, unit=(u.deg, u.deg), frame='icrs')
        simbad = Simbad()
        simbad.TIMEOUT = config['timeout_seconds']

        result_table = simbad.query_region(
            coord,
            radius=config['search_radius_deg'] * u.deg
        )

        if result_table is None:
            logger.debug(f"No SIMBAD matches for RGZ_ID {rgz_id}")
            return []

        results = []
        for record in result_table:
            result_dict = {
                'RGZ_ID': rgz_id,
                'RA_input': ra,
                'Dec_input': dec,
                'Main_ID': record['main_id'],
                'RA_simbad': record['ra'],
                'Dec_simbad': record['dec'],
                'Error_Maj': record['coo_err_maj'],
                'Error_Min': record['coo_err_min'],
                'Error_Angle': record['coo_err_angle'],
                'Wavelength': record['coo_wavelength'],
                'Bibcode': record['coo_bibcode'],
                'Query_Timestamp': datetime.now().isoformat(),
                'Match_Status': 'Success'
            }
            results.append(result_dict)

        logger.debug(f"RGZ_ID {rgz_id}: Found {len(results)} SIMBAD match(es)")
        return results

    except Exception as e:
        logger.debug(f"Query error for RGZ_ID {rgz_id}: {type(e).__name__}: {e}")

        # Retry with exponential backoff
        if retry_count < config['max_retries']:
            wait_time = config['rate_limit_delay'] * (config['retry_backoff'] ** retry_count)
            logger.debug(f"Retrying RGZ_ID {rgz_id} in {wait_time:.1f}s (attempt {retry_count + 1})")
            time.sleep(wait_time)
            return query_simbad_single(row, config, logger, retry_count + 1)
        else:
            logger.warning(f"Failed to query RGZ_ID {rgz_id} after {config['max_retries']} retries")
            return None


def query_simbad_batch(
    df: pd.DataFrame,
    config: Dict,
    logger: logging.Logger,
    start_idx: int = 0
) -> List[Dict]:
    """
    Query SIMBAD for multiple objects using concurrent requests.

    Args:
        df: DataFrame with coordinates
        config: Configuration dictionary
        logger: Logger instance
        start_idx: Starting index for processing

    Returns:
        List of all results
    """
    max_objects = min(config['max_objects'], len(df))
    df_subset = df.iloc[start_idx:start_idx + max_objects].reset_index(drop=True)

    all_results = []

    logger.info(f"Querying SIMBAD for {len(df_subset)} objects (workers: {config['max_workers']})")

    with ThreadPoolExecutor(max_workers=config['max_workers']) as executor:
        # Submit all tasks
        future_to_idx = {
            executor.submit(query_simbad_single, row, config, logger): idx
            for idx, (_, row) in enumerate(df_subset.iterrows())
        }

        # Process completed tasks with progress bar
        with tqdm(total=len(df_subset), desc="SIMBAD Queries") as pbar:
            for future in as_completed(future_to_idx):
                idx = future_to_idx[future]
                try:
                    results = future.result()
                    if results is not None:
                        all_results.extend(results)
                except Exception as e:
                    logger.error(f"Worker error at index {idx}: {e}")

                pbar.update(1)

                # Save checkpoint periodically
                if (idx + 1) % config['checkpoint_interval'] == 0:
                    save_checkpoint(
                        config['checkpoint_dir'],
                        start_idx + idx + 1,
                        all_results,
                        logger
                    )

    logger.info(f"✓ Completed {len(df_subset)} queries, found {len(all_results)} SIMBAD matches")
    return all_results


# ============================================================================#
# RESULTS PROCESSING AND OUTPUT                                              #
# ============================================================================#

def generate_summary_statistics(
    df_input: pd.DataFrame,
    results: List[Dict],
    logger: logging.Logger
) -> Dict:
    """
    Generate summary statistics for query results.

    Args:
        df_input: Original input DataFrame
        results: List of query results
        logger: Logger instance

    Returns:
        Dictionary of statistics
    """
    if not results:
        stats = {
            'total_objects_queried': len(df_input),
            'total_matches_found': 0,
            'match_rate': 0.0,
            'avg_matches_per_object': 0.0,
            'objects_with_matches': 0,
            'objects_without_matches': len(df_input)
        }
    else:
        results_df = pd.DataFrame(results)
        matched_objects = results_df['RGZ_ID'].nunique()

        stats = {
            'total_objects_queried': len(df_input),
            'total_matches_found': len(results),
            'match_rate': 100.0 * matched_objects / len(df_input),
            'avg_matches_per_object': len(results) / matched_objects if matched_objects > 0 else 0,
            'objects_with_matches': matched_objects,
            'objects_without_matches': len(df_input) - matched_objects
        }

    logger.info("\n" + "="*60)
    logger.info("QUERY SUMMARY STATISTICS")
    logger.info("="*60)
    for key, value in stats.items():
        if 'rate' in key or 'avg' in key:
            logger.info(f"{key}: {value:.2f}%") if 'rate' in key else logger.info(f"{key}: {value:.2f}")
        else:
            logger.info(f"{key}: {value}")
    logger.info("="*60 + "\n")

    return stats


def save_results(
    results: List[Dict],
    output_filename: str,
    logger: logging.Logger
) -> bool:
    """
    Save query results to CSV file.

    Args:
        results: List of result dictionaries
        output_filename: Output file path
        logger: Logger instance

    Returns:
        True if save successful, False otherwise
    """
    if not results:
        logger.warning("No results to save")
        return False

    try:
        results_df = pd.DataFrame(results)
        results_df.to_csv(output_filename, index=False)
        logger.info(f"✓ Results saved to {output_filename}")
        logger.debug(f"Shape: {results_df.shape}, Columns: {list(results_df.columns)}")

        # Clean up checkpoint after successful completion
        checkpoint_file = os.path.join(CONFIG['checkpoint_dir'], "checkpoint.json")
        if os.path.exists(checkpoint_file):
            os.remove(checkpoint_file)
            logger.debug("✓ Checkpoint cleaned up after successful completion")

        return True
    except Exception as e:
        logger.error(f"Failed to save results: {e}")
        return False


# ============================================================================#
# MAIN EXECUTION                                                             #
# ============================================================================#

def main():
    """Main execution function."""

    # Setup
    print("="*80)
    print("RADIO GALAXY SIMBAD QUERY - IMPROVED VERSION")
    print("="*80 + "\n")

    logger = setup_logging(CONFIG['log_filename'])
    logger.info("Script started")
    logger.debug(f"Configuration: {json.dumps(CONFIG, indent=2)}")

    try:
        # Download data
        if not download_data(CONFIG['drive_url'], CONFIG['local_filename'], logger):
            raise RuntimeError("Failed to download data file")

        # Load and clean data
        df = load_and_clean_data(CONFIG['local_filename'], CONFIG, logger)

        # Load checkpoint if available
        result_data, processed_count = load_checkpoint(CONFIG['checkpoint_dir'], logger)

        # Query SIMBAD
        new_results = query_simbad_batch(df, CONFIG, logger, start_idx=processed_count)
        result_data.extend(new_results)

        # Generate statistics
        stats = generate_summary_statistics(df, result_data, logger)

        # Save results
        if save_results(result_data, CONFIG['output_filename'], logger):
            logger.info("✓ Script completed successfully")
        else:
            logger.error("✗ Script failed to save results")

    except Exception as e:
        logger.exception(f"Script failed with error: {e}")
        raise


if __name__ == "__main__":
    main()

2025-11-01 18:11:53,760 - SIMBAD_Query - INFO - Script started
INFO:SIMBAD_Query:Script started
DEBUG:SIMBAD_Query:Configuration: {
  "drive_url": "https://drive.google.com/file/d/1owgNHA6x1LMgB7uCvueUQYoYxseXJfzD/view?usp=sharing",
  "local_filename": "rgz_data.csv",
  "output_filename": "simbad_query_results.csv",
  "checkpoint_dir": "checkpoints",
  "log_filename": "simbad_query.log",
  "search_radius_deg": 0.1,
  "max_objects": 100,
  "batch_size": 10,
  "checkpoint_interval": 20,
  "timeout_seconds": 500,
  "rate_limit_delay": 1.0,
  "max_retries": 3,
  "retry_backoff": 2.0,
  "max_workers": 5,
  "min_ra": 0,
  "max_ra": 360,
  "min_dec": -90,
  "max_dec": 90
}
2025-11-01 18:11:53,773 - SIMBAD_Query - INFO - Downloading CSV from Google Drive...
INFO:SIMBAD_Query:Downloading CSV from Google Drive...


RADIO GALAXY SIMBAD QUERY - IMPROVED VERSION



2025-11-01 18:11:55,181 - SIMBAD_Query - INFO - ✓ Downloaded file saved as rgz_data.csv
INFO:SIMBAD_Query:✓ Downloaded file saved as rgz_data.csv
DEBUG:SIMBAD_Query:File size: 5799087 bytes
2025-11-01 18:11:55,193 - SIMBAD_Query - INFO - Loading data from rgz_data.csv...
INFO:SIMBAD_Query:Loading data from rgz_data.csv...
2025-11-01 18:11:55,549 - SIMBAD_Query - INFO - ✓ Loaded 100,185 rows
INFO:SIMBAD_Query:✓ Loaded 100,185 rows
DEBUG:SIMBAD_Query:Columns: ['CatID', 'RGZID', 'ZooniverseID', 'RA', 'Dec']
DEBUG:SIMBAD_Query:Renamed columns: {'rgzid': 'RGZ_ID', 'ra': 'RA', 'dec': 'Dec'}
DEBUG:SIMBAD_Query:Before cleaning: 100185 rows
DEBUG:SIMBAD_Query:After removing NaN: 100185 rows
2025-11-01 18:11:55,714 - SIMBAD_Query - INFO - ✓ 100,185 valid rows after cleaning
INFO:SIMBAD_Query:✓ 100,185 valid rows after cleaning
2025-11-01 18:11:55,718 - SIMBAD_Query - INFO - Querying SIMBAD for 100 objects (workers: 5)
INFO:SIMBAD_Query:Querying SIMBAD for 100 objects (workers: 5)
SIMBAD Queries: