In [1]:
import sxobsplan

from pathlib import Path
import pandas as pd
import astropy.units as u
from astropy.time import Time

import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from mpl_toolkits.axes_grid1 import make_axes_locatable
import _rcparams

In [2]:
from __future__ import annotations
import numpy as np
import astropy.units as u
from astropy.time import Time
from astropy.coordinates import SkyCoord, EarthLocation, AltAz
from astroplan import Observer
from typing import Union, List

# __all__ and resolve_location, _normalize_date_to_noon_utc functions remain the same
__all__ = [
    "is_target_visible",
    "resolve_location",
]

LocationLike = Union[EarthLocation, str]

import warnings
from astropy.utils.exceptions import AstropyWarning
warnings.filterwarnings(
    "ignore",
    category=AstropyWarning,
    message="Tried to get polar motions"
)

def resolve_location(location: LocationLike) -> EarthLocation:
    if isinstance(location, EarthLocation):
        return location
    if isinstance(location, str):
        try:
            return EarthLocation.of_site(location)
        except Exception as e:
            raise ValueError(f"Could not resolve observatory code '{location}': {e}")
    raise TypeError("location must be EarthLocation or str (observatory name).")

def _normalize_date_to_noon_utc(date) -> Time:
    t0 = Time(date).utc
    midnight_jd = np.floor(t0.jd)
    midnight = Time(midnight_jd, format='jd', scale='utc')
    is_at_midnight = np.abs((t0 - midnight).to(u.s)) < 1.0 * u.s
    noon_jd = midnight_jd + 0.5
    noon = Time(noon_jd, format='jd', scale='utc')
    result_jd = np.where(is_at_midnight, noon.jd, t0.jd)
    return Time(result_jd, format='jd', scale='utc')


def is_target_visible(
    ra: u.Quantity | np.ndarray,
    dec: u.Quantity | np.ndarray,
    date: List | np.ndarray,
    location: LocationLike,
    *,
    elev_min: u.Quantity = 30 * u.deg,
    duration: u.Quantity = 1 * u.hour,
    dt_step: u.Quantity = 2 * u.min,
    return_block: bool = True,
):
    """
    Determine target visibility using fully vectorized calculations.
    Assumes all dates in the input array are unique.

    (Docstring parameters and returns are the same as the previous version)
    """
    # --- 1. 입력 데이터 처리 및 검증 ---
    ra_arr = u.Quantity(ra)
    dec_arr = u.Quantity(dec)
    dates_arr = np.atleast_1d(date)
    
    if not (ra_arr.shape == dec_arr.shape == dates_arr.shape):
        raise ValueError("ra, dec, and date must have the same shape.")

    input_shape = ra_arr.shape
    num_inputs = ra_arr.size
    
    ra_flat = ra_arr.flatten()
    dec_flat = dec_arr.flatten()
    dates_flat = dates_arr.flatten()

    # --- 2. 관측자 및 시간 관련 변수 설정 (벡터화) ---
    loc = resolve_location(location)
    obs = Observer(location=loc, name=str(location), timezone="UTC")

    # 모든 날짜를 정규화하여 기준 시간 배열(t0_arr) 생성
    t0_arr = _normalize_date_to_noon_utc(dates_flat)
    
    # 모든 날짜에 대한 박명 시간을 한 번에 계산
    try:
        e_twilight_arr = obs.twilight_evening_astronomical(t0_arr, which="next")
        m_twilight_arr = obs.twilight_morning_astronomical(e_twilight_arr, which="next")
    except Exception:
        # 하나라도 실패하면 빈 결과 반환 (혹은 더 정교한 예외 처리 가능)
        is_visible = np.zeros(input_shape, dtype=bool)
        blocks = np.empty(input_shape, dtype=object)
        for i in np.ndindex(blocks.shape): blocks[i] = []
        return (is_visible, blocks) if return_block else is_visible

    # --- 3. 2D 시간 그리드 생성 ---
    # 1D 시간 스텝 배열
    time_steps = np.arange(0, 24 * 60, dt_step.to(u.min).value) * u.min
    # Broadcasting을 이용해 2D 시간 그리드 생성. Shape: (num_inputs, num_steps)
    time_grid_2d = t0_arr[:, np.newaxis] + time_steps[np.newaxis, :]

    # --- 4. 고도 계산 (AstroPy Core 기능 사용) ---
    coords = SkyCoord(ra=ra_flat, dec=dec_flat)
    # AltAz 프레임에 2D 시간 그리드를 전달하여 모든 시점의 좌표 변환
    altaz_frame = AltAz(obstime=time_grid_2d, location=loc)
    # coords를 (N, 1) 형태로 바꿔 Broadcasting이 가능하도록 함
    altaz_coords = coords[:, np.newaxis].transform_to(altaz_frame)
    altitudes = altaz_coords.alt # Shape: (num_inputs, num_steps)
    
    # --- 5. 관측 가능 조건 마스크 생성 (벡터화) ---
    A_high = altitudes >= elev_min
    # 박명 시간 배열을 (N, 1) 형태로 바꿔 Broadcasting
    A_dark = (time_grid_2d >= e_twilight_arr[:, np.newaxis]) & \
             (time_grid_2d <= m_twilight_arr[:, np.newaxis])
    A_vis = A_high & A_dark
    
    # --- 6. 관측 가능 블록 탐색 (벡터화) ---
    # 각 행(대상)의 시작과 끝에 False를 추가하여 경계를 명확히 함
    padded = np.pad(A_vis, ((0, 0), (1, 1)), constant_values=False)
    # diff를 이용해 값이 바뀌는 지점(True->False or False->True)을 찾음
    changes = np.diff(padded.astype(np.int8), axis=1)
    
    # starts는 1, stops는 -1이 됨. np.where로 모든 시작/종료 지점의 인덱스를 찾음
    starts_row, starts_col = np.where(changes == 1)
    stops_row, stops_col = np.where(changes == -1)

    # --- 7. 결과 조합 및 필터링 ---
    is_visible_flat = np.zeros(num_inputs, dtype=bool)
    blocks_flat = np.empty(num_inputs, dtype=object)
    for i in range(num_inputs): blocks_flat[i] = []

    # 각 대상(row)별로 결과 조합
    for i in range(num_inputs):
        s_cols = starts_col[starts_row == i]
        e_cols = stops_col[stops_row == i]
        
        target_blocks = []
        for s, e in zip(s_cols, e_cols):
            # 2D 시간 그리드의 해당 행에서 시간 정보 추출
            start_time = time_grid_2d[i, s]
            end_time = time_grid_2d[i, e-1]
            dur = (end_time - start_time).to(u.hour)
            
            if dur >= duration:
                target_blocks.append({
                    "start": start_time,
                    "end": end_time,
                    "duration": dur
                })
        
        if len(target_blocks) > 0:
            is_visible_flat[i] = True
            blocks_flat[i] = target_blocks

    # --- 8. 결과를 원래 입력 형태에 맞게 재구성 ---
    is_visible = is_visible_flat.reshape(input_shape)
    blocks = blocks_flat.reshape(input_shape)
    
    return (is_visible, blocks) if return_block else is_visible

In [3]:
# SBDB
fpath_sbdb = Path("../data/sbdb_query_results_comet_ver250926.csv")
df_sbdb = pd.read_csv(fpath_sbdb)
df_sbdb.head()

Unnamed: 0,spkid,full_name,pdes,name,prefix,neo,pha,sats,H,G,...,rms,two_body,A1,A1_sigma,A2,A2_sigma,A3,A3_sigma,DT,DT_sigma
0,1000036,1P/Halley,1P,Halley,P,Y,,0,,,...,0.61052,,4.9e-10,4e-11,1.6e-10,4.6e-15,,,,
1,1000025,2P/Encke,2P,Encke,P,Y,,0,,,...,0.4526,,2.5e-10,6.4e-11,3.5e-13,4.1e-12,,,,
2,1000504,3D/Biela,3D,Biela,D,Y,,0,,,...,,,3.9e-09,,-2.5e-10,,,,,
3,1000026,4P/Faye,4P,Faye,P,,,0,,,...,0.54329,,4e-09,1.2e-10,3.2e-10,6.3e-11,-7.1e-10,4.2e-11,-37.7,2.17
4,1000505,5D/Brorsen,5D,Brorsen,D,Y,,0,,,...,,,1.3e-08,,1.3e-09,,,,,


In [5]:
import pandas as pd
from pathlib import Path
import astropy.units as u
from astropy.time import Time
# sxobsplan 모듈과 벡터화된 is_target_visible 함수가 있다고 가정합니다.
import sxobsplan 

# --- 디렉토리 설정 (기존과 동일) ---
VISDIR = Path("../visibility")
VISDIR.mkdir(exist_ok=True, parents=True)
EPHDIR = Path("../eph") # 예시 EPHDIR 경로

# df_sbdb DataFrame이 이미 로드되어 있다고 가정
# df_sbdb = pd.read_csv(...) 

# --- 메인 루프 (외부 루프는 유지) ---
for idx, row in df_sbdb.iterrows():
    pdes = row.pdes
    fpath_eph = EPHDIR / f"{''.join(pdes.split())}.csv"
    fpath_vis = VISDIR / f"{''.join(pdes.split())}.csv"

    if not fpath_eph.exists():
        print(f"{fpath_eph.name} does not exist. Skipping.")
        continue

    # target 및 ephemeris 정보 로드 (기존과 동일)
    target = row
    eph = pd.read_csv(fpath_eph)

    if (eph.r > 10).all() or (eph.Tmag > 23).all():
        print(f"{fpath_eph.name}: all r > 10 or all Tmag > 23. Skipping.")
        continue

    # ================================================================== #
    # =================== 코드 수정의 핵심 부분 시작 =================== #
    # ================================================================== #

    # 1. 벡터화 함수에 전달할 입력 배열 준비
    # pandas Series에서 NumPy 배열을 직접 추출하여 Astropy 객체 생성
    ra_arr = eph.RA.values * u.deg
    dec_arr = eph.DEC.values * u.deg
    dates_arr = Time(eph.datetime_jd.values, format="jd")

    # 2. 결과 저장을 위한 DataFrame 생성
    # 기존 `eph` 데이터프레임을 복사하여 효율적으로 구성
    df_visible = eph.copy()
    df_visible.rename(columns={"RA": "ra", "DEC": "dec", "GlxLat": "glxlat"}, inplace=True)
    df_visible["pdes"] = target["pdes"]
    df_visible["full_name"] = target["full_name"]
    df_visible["name"] = target["name"]
    df_visible["class"] = target["class"].upper()

    # 3. 각 천문대에 대해 벡터화 함수를 단 한 번씩만 호출
    for observatory in ["gemini_north", "gemini_south"]:
        
        # 벡터화 함수 호출: 모든 날짜에 대한 가시성을 한 번에 계산
        is_visible, blocks = is_target_visible(
            ra=ra_arr,
            dec=dec_arr,
            date=dates_arr,
            location=observatory,
            elev_min=30 * u.deg,
            duration=1 * u.hour
        )

        # 4. 반환된 배열 형태의 결과를 DataFrame에 추가
        df_visible[f"is_visible_{observatory}"] = is_visible
        
        # `blocks` 배열을 처리하여 duration 정보 추출 (List Comprehension 사용)
        # `blocks`의 각 요소(b)는 visibility block 딕셔너리의 리스트이거나 빈 리스트임
        durations = [
            b[0]["duration"].to_value("hour") if b else None for b in blocks
        ]
        df_visible[f"duration_{observatory}"] = durations

    # ================================================================ #
    # =================== 코드 수정의 핵심 부분 종료 =================== #
    # ================================================================ #

    df_visible.to_csv(fpath_vis, index=False)
    print(f"Visibility saved: {fpath_vis}")

1P.csv: all r > 10 or all Tmag > 23. Skipping.




Visibility saved: ../visibility/2P.csv
3D.csv does not exist. Skipping.




Visibility saved: ../visibility/4P.csv
5D.csv does not exist. Skipping.


KeyboardInterrupt: 