In [None]:
from IPython.display import clear_output
!pip install astroquery photutils
# !conda install -c conda-forge astroquery phototils
# # If you do not:
# !git clone https://github.com/astropy/astroquery.git
# !cd astroquery phototils
# !python setup.py install
clear_output()

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import polars as pl
from scipy.spatial import KDTree
from itertools import combinations
import random

from astroquery.skyview import SkyView
from astropy.io import fits
from astropy import units as u
from astropy.wcs import WCS

from astropy.stats import sigma_clipped_stats
from photutils.detection import IRAFStarFinder
from photutils.aperture import CircularAperture

import os
import time
import warnings
from astropy.wcs import FITSFixedWarning
warnings.filterwarnings('ignore')

In [None]:
# Загрузка Parquet-файлов каталогов

def upload_file(filename, prompt, columns_names):

    print(prompt)
    
    df = pl.read_parquet(filename)
    
    if len(df.columns) != len(columns_names):
        raise ValueError(f"Число колонок в файле ({len(df.columns)}) не совпадает с числом имен ({len(columns_names)})")
    
    rename_dict = {old: new for old, new in zip(df.columns, columns_names)}
    df = df.rename(rename_dict)
    
    if len(columns_names) == 9:  # Для coords_catalog
        schema_overrides = {
            'X': pl.Float64,
            'Y': pl.Float64,
            'Gmag': pl.Float64,
            'RA': pl.Float64,
            'Dec': pl.Float64,
            'RA_IRAF': pl.Float64,
            'Dec_IRAF': pl.Float64,
            'Name_a': pl.Float64,
            'Name_b': pl.Float64
        }
    elif len(columns_names) == 5:  # Для dists_catalog
        schema_overrides = {
            'Distance': pl.Float64,
            'Name1_a': pl.Float64,
            'Name1_b': pl.Float64,
            'Name2_a': pl.Float64,
            'Name2_b': pl.Float64
        }
    else:
        raise ValueError(f"Неподдерживаемое число колонок: {len(columns_names)}")
    
    df = df.with_columns([
        pl.col(col).cast(dtype) for col, dtype in schema_overrides.items()
    ])
    
    return df, filename

In [None]:
# Функция расстояния в пикселях
def calculate_distance(x1_pix, y1_pix, x2_pix, y2_pix):
    return np.sqrt((x1_pix - x2_pix)**2 + (y1_pix - y2_pix)**2)

def get_random_in_square(x_min, x_max, y_min, y_max):
    x = round(random.uniform(x_min, x_max), 1)
    y = round(random.uniform(y_min, y_max), 1)
    return x, y

# Конвертация Astropy Table в Polars DF
def qtable_to_polars(qtable):
    data_dict = {}
    for col_name in qtable.colnames:
        column = qtable[col_name]
        data_dict[col_name] = column.value if hasattr(column, 'value') else column.data
    df = pl.DataFrame(data_dict)
    if hasattr(qtable, 'meta') and qtable.meta:
        pass
    return df

# Загрузка изображения
def get_image(ra, dec, rad, pix, cat):
    fites = SkyView.get_images(position=f"{ra}, {dec}", survey=[cat], pixels=pix, projection='Tan', radius=rad * u.deg)[0][0]
    return fites


In [None]:
# Поиск фотоцентров на изображении

def find_stars_on_image(image, fwhm=10.0, threshold_factor=3.0,  some_flux_threshold=50, plot_results=False):
    data = image.data
    mean, median, std = sigma_clipped_stats(data, sigma=2.0)
    finder = IRAFStarFinder(fwhm=fwhm, threshold=threshold_factor * std,
                            sharplo=0.2, sharphi=1.5,
                            roundlo=-1.0, roundhi=1.0,
                            peakmax=50000,
                            exclude_border=True, sigma_radius=2.0)
    sources = finder(data - median)

    if sources is not None:
        sources.sort('flux', reverse=True)
        bright_sources = sources[: some_flux_threshold]  # Топ-50 ярких
        print(f"Bright stars found (top { some_flux_threshold}): {len(bright_sources)}")

        if plot_results:
            positions = np.transpose((bright_sources['xcentroid'], bright_sources['ycentroid']))
            plt.imshow(data, cmap='gray', origin='lower')
            apertures = CircularAperture(positions, r=8.)
            apertures.plot(color='red', lw=1.5, alpha=0.7)
            plt.show()

        return bright_sources
    else:
        print("No stars found!")
        return None
        

In [None]:
# Обработка пар

def sort_sources(sources_table, scale, theta_max, use_kdtree=False):
    if len(sources_table) > 1:
        ids = sources_table['id'].to_numpy()
        x = sources_table['xcentroid'].to_numpy()
        y = sources_table['ycentroid'].to_numpy()
        flux = sources_table['flux'].to_numpy()
    
        if use_kdtree:
            pixel_max = theta_max / scale
            tree = KDTree(np.column_stack((x, y)))
            pairs = tree.query_pairs(r=pixel_max, output_type='ndarray')
            i, j = pairs[:, 0], pairs[:, 1]
        else:
            pairs = list(combinations(range(len(sources_df)), 2))
            i = np.array([p[0] for p in pairs])
            j = np.array([p[1] for p in pairs])
    
        distance_pix = calculate_distance(x[i], y[i], x[j], y[j])
        distance_deg = distance_pix * scale
        total_brightness = flux[i] + flux[j]
    
        mask = (distance_pix >= fwhm) & (distance_deg <= theta_max)
        filtered_i = ids[i[mask]]
        filtered_j = ids[j[mask]]
        filtered_dist = distance_deg[mask]
        filtered_bright = total_brightness[mask]
    
        unsorted_pairs = pl.DataFrame({
            'id1': filtered_i,
            'id2': filtered_j,
            'distance': filtered_dist,
            'total_brightness': filtered_bright
        })
    
        sorted_sources = unsorted_pairs.sort('total_brightness', descending=True)

        return sorted_sources
        

In [None]:
# Вспомогательные функции

def find_duplicates(a, b, c, d):
    values = set([a, b, c, d])
    duplicates = [x for x in [a, b] if x in [c, d]]
    uniques = list(values - set(duplicates))
    return {'duplicates': duplicates, 'uniques': uniques}

def find_common_elements(df1, df2):
    df1_flat = df1.select(pl.col('star1').alias('star')).vstack(
        df1.select(pl.col('star2').alias('star'))
    )
    df2_flat = df2.select(pl.col('star1').alias('star')).vstack(
        df2.select(pl.col('star2').alias('star'))
    )
    common = df1_flat.join(df2_flat, on='star', how='inner')['star'].to_list()
    return common if common else None

def find_elements(df, element):
    mask1 = pl.col('star1') == element
    mask2 = pl.col('star2') == element
    indices1 = df.select(pl.arg_where(mask1)).to_series().to_list()
    indices2 = df.select(pl.arg_where(mask2)).to_series().to_list()

    return (indices1 + indices2, [0] * len(indices1) + [1] * len(indices2))

def compare_lists(list1, list2):
    common = list(set(list1) & set(list2))
    return {'common': common, 'unique_in_list1': [x for x in list1 if x not in list2],
            'unique_in_list2': [x for x in list2 if x not in list1]}


In [None]:
# Основной алгоритм

def find_triangles(sorted_pairs, dists_catalog, coords_catalog, dist_err, mag_err, mag_info=True):

    lists = {}
    sources_couple = {}
    groups = {}
    groups_sources = {}
    group_completed = False
    num_triangle = 0
    count_try = 0
    full_len = 0

    for d_num in range(len(sorted_pairs)):

        distance = sorted_pairs[d_num, 'distance']
        mag1 = sorted_pairs[d_num, 'mag1']
        mag2 = sorted_pairs[d_num, 'mag2']
        source1 = sorted_pairs[d_num, 'id1']
        source2 = sorted_pairs[d_num, 'id2']
        list_name = f'list_{d_num}_{source1}_{source2}'

        dist_matches = dists_catalog.filter(
            pl.col('Distance').is_between(distance - dist_err, distance + dist_err)
        ).select([
            (pl.col('Name1_a').cast(str) + " " + pl.col('Name1_b').cast(str)).alias('star1'),
            (pl.col('Name2_a').cast(str) + " " + pl.col('Name2_b').cast(str)).alias('star2')
        ])

        if not mag_info:
            full_matches = dist_matches

        else:
            mag1_matches = coords_catalog.filter(
                pl.col('Gmag').is_between(mag1 - mag_err, mag1 + mag_err)).select([
                (pl.col('Name_a').cast(str) + " " + pl.col('Name_b').cast(str)).alias('star')
                ])
            mag2_matches = coords_catalog.filter(
                pl.col('Gmag').is_between(mag2 - mag_err, mag2 + mag_err)).select([
                (pl.col('Name_a').cast(str) + " " + pl.col('Name_b').cast(str)).alias('star')
                ])
            full_matches1 = dist_matches.join(mag1_matches, left_on='star1', right_on='star', how='inner')
            full_matches1 = full_matches1.join(mag2_matches, left_on='star2', right_on='star', how='inner')
            full_matches2 = dist_matches.join(mag1_matches, left_on='star2', right_on='star', how='inner')
            full_matches2 = full_matches2.join(mag2_matches, left_on='star1', right_on='star', how='inner')
            full_matches = pl.concat([full_matches1, full_matches2])
        
        full_len += len(full_matches)
        lists[list_name] = full_matches
        sources_couple[d_num] = [source1, source2]

        if d_num == 0 or len(full_matches) == 0:
            continue

        for num_list in range(d_num):
            source3 = sources_couple[num_list][0]
            source4 = sources_couple[num_list][1]
            duplicates = find_duplicates(source1, source2, source3, source4)

            if len(duplicates['duplicates']) == 1:
                common_result = find_common_elements(
                    lists[list_name],
                    lists[f'list_{num_list}_{source3}_{source4}']
                )

                if common_result:
                    for common_star in common_result:
                        loc_common_star_list1 = find_elements(lists[list_name], common_star)
                        loc_common_star_list2 = find_elements(
                            lists[f'list_{num_list}_{source3}_{source4}'], common_star
                        )

                        for l1, col1 in zip(loc_common_star_list1[0], loc_common_star_list1[1]):
                            for l2, col2 in zip(loc_common_star_list2[0], loc_common_star_list2[1]):
                                count_try += 1
                                triangle = [common_star]
                                triangle_sources = [duplicates['duplicates'][0]]

                                other_star1 = lists[list_name][l1, 'star2' if col1 == 0 else 'star1']
                                triangle.append(other_star1)
                                triangle_sources.append(next(x for x in [source1, source2] if x != duplicates['duplicates'][0]))

                                other_star2 = lists[f'list_{num_list}_{source3}_{source4}'][l2, 'star2' if col2 == 0 else 'star1']
                                triangle.append(other_star2)
                                triangle_sources.append(next(x for x in [source3, source4] if x != duplicates['duplicates'][0]))

                                star2_name_a, star2_name_b = map(float, triangle[1].split())
                                star3_name_a, star3_name_b = map(float, triangle[2].split())

                                third_side_stars = dists_catalog.filter(
                                    ((pl.col('Name1_a') == star2_name_a) &
                                    (pl.col('Name1_b') == star2_name_b) &
                                    (pl.col('Name2_a') == star3_name_a) &
                                    (pl.col('Name2_b') == star3_name_b)) |
                                    ((pl.col('Name1_a') == star3_name_a) &
                                    (pl.col('Name1_b') == star3_name_b) &
                                    (pl.col('Name2_a') == star2_name_a) &
                                    (pl.col('Name2_b') == star2_name_b))
                                ).select('Distance').to_series().to_list()

                                third_side_sources = sorted_pairs.filter(
                                    ((pl.col('id1') == duplicates['uniques'][0]) &
                                    (pl.col('id2') == duplicates['uniques'][1])) |
                                    ((pl.col('id2') == duplicates['uniques'][0]) &
                                    (pl.col('id1') == duplicates['uniques'][1]))
                                ).select('distance').to_series().to_list()

                                if third_side_stars and third_side_sources:
                                    if abs(third_side_stars[0] - third_side_sources[0]) < dist_err:
                                        #print("New triangle is built!", d_num)
                                        num_triangle += num_triangle

                                        found_in_group = False
                                        for group_id, group_stars in list(groups.items()):
                                            stars_in_group = compare_lists(triangle, group_stars)
                                            sources_in_group = compare_lists(triangle_sources, groups_sources[group_id])
                                            if stars_in_group['common']:
                                                found_in_group = True
                                                for i, new_star in enumerate(triangle):
                                                    new_source = triangle_sources[i]
                                                    if new_star not in group_stars:
                                                        group_stars.append(new_star)
                                                    if new_source not in groups_sources[group_id]:
                                                        groups_sources[group_id].append(new_source)

                                        if not found_in_group:
                                            groups[num_triangle - 1] = list(triangle)
                                            groups_sources[num_triangle - 1] = list(triangle_sources)

                                        for group_id, group_stars in groups.items():
                                            if len(group_stars) > 3 and len(groups_sources[group_id]) > 3:

                                                #print("Group is completed!\n")
                                                valid_group_sources = list(groups_sources[group_id])
                                                valid_group = list(group_stars)
                                                main_star = [valid_group_sources[0], valid_group[0]]
                                                return main_star, valid_group, valid_group_sources#, d_num+1, count_try, full_len 
                                                                                                  # это величины для тестирования: 
                                                                                                  # "Обработано пар фотоцентров" d_num+1, 
                                                                                                  # "Количество запросов к БД" count_try, 
                                                                                                  # "Объём запросов" full_len
     
    print("Identification failed with no completed group\n")
    return None, [], []#, len(sorted_pairs)+1, count_try, full_len - те же величины для тестирования
    

In [None]:
# Функции для отождествления навигационных звёзд (верификация)

def create_custom_wcs(ra, dec, x, y, projection='TAN'):
    wcs = WCS(naxis=2)
    wcs.wcs.crpix = [x, y]
    wcs.wcs.crval = [ra, dec]
    wcs.wcs.cdelt = [-Scale, Scale]
    wcs.wcs.ctype = [f"RA---{projection}", f"DEC--{projection}"]
    return wcs

def find_frame_center(star_ra, star_dec, star_x, star_y):
    wcs_temp = create_custom_wcs(star_ra, star_dec, star_x, star_y)
    cent_x = Image_size / 2.0
    cent_y = Image_size / 2.0
    cent_ra, cent_dec = wcs_temp.all_pix2world(cent_x, cent_y, 0)
    wcs_cent = create_custom_wcs(cent_ra, cent_dec, cent_x, cent_y)
    return cent_ra, cent_dec, wcs_cent

def w2p(df, cent_ra, cent_dec):
    wcs = create_custom_wcs(cent_ra, cent_dec, Image_size / 2.0, Image_size / 2.0)

    with warnings.catch_warnings():
        warnings.simplefilter('ignore', FITSFixedWarning)
        x_pixels, y_pixels = wcs.all_world2pix(df['RA'].to_numpy(), df['Dec'].to_numpy(), 0)

    result_df = df.with_columns([
        pl.Series('x_pixel', x_pixels),
        pl.Series('y_pixel', y_pixels)
    ]).filter(
        (pl.col('x_pixel') >= 0) &
        (pl.col('x_pixel') < Image_size) &
        (pl.col('y_pixel') >= 0) &
        (pl.col('y_pixel') < Image_size)
    )

    return result_df

def find_star_coords(star, coords_catalog, sources_df):
    star_name_a, star_name_b = map(float, star[1].split())
    ra_dec = coords_catalog.filter(
        (pl.col('Name_a') == star_name_a) & (pl.col('Name_b') == star_name_b)
    ).select(['RA', 'Dec']).row(0)

    return ra_dec[0], ra_dec[1]

def nav_stars_on_frame(df, ra_cent, de_cent, rad, mag):
    half_side = round(rad, 1)
    ra_min = ra_cent - half_side
    ra_max = ra_cent + half_side
    dec_min = de_cent - half_side
    dec_max = de_cent + half_side

    nav_stars_df = df.filter(
        (pl.col('Gmag') < mag) &
        (pl.col('RA') >= ra_min) & (pl.col('RA') <= ra_max) &
        (pl.col('Dec') >= dec_min) & (pl.col('Dec') <= dec_max)
    ).drop(['X', 'Y'])

    return nav_stars_df

def kdtree_match(stars, sources, tolerance=5):
    coords1 = stars.select(['x_pixel', 'y_pixel']).to_numpy()
    coords2 = sources.select(['xcentroid', 'ycentroid']).to_numpy()
    tree = KDTree(coords2)
    distances, indices = tree.query(coords1, distance_upper_bound=tolerance)

    matches = []
    for i, (dist, idx) in enumerate(zip(distances, indices)):
        if idx < len(coords2) and dist <= tolerance:
            matches.append({
                'x_star': coords1[i, 0],
                'y_pixel': coords1[i, 1],
                'xcentroid': coords2[idx, 0],
                'ycentroid': coords2[idx, 1],
                'distance': dist
            })
    return pl.DataFrame(matches)


In [None]:
# Загрузка каталогов

coords_col_name = ['X', 'Y', 'Gmag', 'RA', 'Dec', 'RA_IRAF', 'Dec_IRAF', 'Name_a', 'Name_b']
dists_col_name = ['Distance', 'Name1_a', 'Name1_b', 'Name2_a', 'Name2_b']

coords_catalog, coords_catalog_filename = upload_file(
    "coords_g10.parquet",
    f"Uploading catalog with coordinates...",
    coords_col_name
)

dists_catalog, dists_catalog_filename = upload_file(
    "dists_g10.parquet",
    f"Uploading catalog with distances...\n",
    dists_col_name
)

print(f"Number of stars: {len(coords_catalog)}")
print(f"Number of distances: {len(dists_catalog)}\n")
print("Coords catalog head:")
print(coords_catalog.head())
print("Dists catalog head:")
print(dists_catalog.head())


In [None]:
def calculate_angular_distance(ra1_deg, dec1_deg, ra2_deg, dec2_deg):

    ra1_rad = np.radians(ra1_deg)
    dec1_rad = np.radians(dec1_deg)
    ra2_rad = np.radians(ra2_deg)
    dec2_rad = np.radians(dec2_deg)

    delta_ra = np.abs(ra1_deg - ra2_deg)
    delta_ra = np.minimum(delta_ra, 360 - delta_ra)
    delta_ra_rad = np.radians(delta_ra)

    cos_distance = (
        np.sin(dec1_rad) * np.sin(dec2_rad) +
        np.cos(dec1_rad) * np.cos(dec2_rad) * np.cos(delta_ra_rad)
    )
    cos_distance = np.clip(cos_distance, -1.0, 1.0)
    distance_rad = np.arccos(cos_distance)
    distance_deg = np.degrees(distance_rad)
    
    return distance_deg

In [None]:
def compute_distances(sources_table, theta_max):

    coords = sources_table.select(['RA', 'Dec']).to_numpy()
    mag = sources_table.select(['Gmag']).to_numpy()
    names = sources_table.select(['ID']).to_numpy()
    
    tree = KDTree(coords)
    pairs = tree.query_pairs(r=theta_max*1.5, output_type='ndarray')
    
    result = []
    k = 0
    
    i, j = pairs[:, 0], pairs[:, 1]
    ra1, dec1 = coords[i, 0], coords[i, 1]
    ra2, dec2 = coords[j, 0], coords[j, 1]
    theta = calculate_angular_distance(ra1, dec1, ra2, dec2)
    
    valid = (theta > 0) & (theta <= theta_max)
    i, j, theta = i[valid], j[valid], theta[valid]

    for idx in range(len(i)):
        result.append({
            'distance': theta[idx],
            'sum_Gmag': mag[i[idx]] + mag[j[idx]],
            'mag1': mag[i[idx]],
            'mag2': mag[j[idx]],
            'id1': names[i[idx]],
            'id2': names[j[idx]]
        })
        k=k+1
    
    unsorted_sources = pl.DataFrame(result, schema={
        'distance': pl.Float64,
        'sum_Gmag': pl.Float64,
        'mag1': pl.Float64,
        'mag2': pl.Float64,
        'id1': pl.Int64,
        'id2': pl.Int64,
    })
    unsorted_sources = unsorted_sources.unique(keep="first")
    sorted_sources = unsorted_sources.sort('sum_Gmag', descending=False)
    sorted_sources = sorted_sources.drop(['sum_Gmag'])
    
    return sorted_sources

In [None]:
import polars as pl
import numpy as np
import matplotlib.pyplot as plt

def add_measurement_error(df, columns=['Gmag', 'RA', 'Dec'], error_coords=0.001, error_mag=0.1, error_type='absolute'):

    result = df.clone()

    for i, col in enumerate(columns):
        values = df[col].to_numpy()
        
        if error_type == 'relative':
            if i == 0:
                std_dev = np.abs(values) * error_mag/3.
            else:
                std_dev = np.abs(values) * error_coords/3.
                
        else:
            if i == 0:
                std_dev = error_mag/3.
            else:
                std_dev = error_coords/3.
        
        noise = np.random.normal(0, std_dev, len(values))
        noisy_values = values + noise
        result = result.with_columns(pl.lit(noisy_values).alias(col))
    
    return result
    

In [None]:
# Основные параметры

Rad = 1
Image_size = 1024
Catalog_name = "DSS"
Scale = 0.7 / (Image_size/2.) # размеря пикселя в градусах

Theta_max = 0.7

# Загрузка изображения и обработка источников на нём (сделано для виртуального кадра)

RA_true, Dec_true = get_random_in_square(0, 358, -68, 68)
print(f"True coords: RA_true = {RA_true:.1f}°, Dec_true = {Dec_true:.1f}°")
sources = nav_stars_on_frame(coords_catalog, RA_true, Dec_true, rad=Rad, mag=10)
sources = add_measurement_error(sources, error_coords=Scale, error_mag=0.1)
if len(sources) > 13:
    print(f"Number of sources: {len(sources)}")
    sources_id = []
    for idx in range(len(sources)):
        sources_id.append(idx+1)
    sources = sources.with_columns(pl.Series('ID', sources_id))
    sorted_pairs = compute_distances(sources, Theta_max)
else:
    print(f"Image is invalid, {len(sources)} sources")
    sources = sources.clear()
    

In [None]:
%%time
# Применение основного алгоритма

Dist_err = Scale
Mag_err = 0.1
start_time = time.time()
main_star, valid_group, valid_group_sources = find_triangles(sorted_pairs, dists_catalog, coords_catalog, Dist_err, Mag_err, mag_info=True)
elapsed = time.time() - start_time 

# Вывод результатов
if main_star:
    print(f"Main star: {main_star}")
    print(f"Valid star group: {valid_group}")
    print(f"Valid sources group: {valid_group_sources}\n")

    # Применяется, если используется виртуальный кадр; если нет - переход к следующей ячейке
    # count_matches = []
    # for i, star in enumerate(valid_group):
    #     star_name_a, star_name_b = map(float, star.split())
    #     star_match = sources.filter((pl.col('Name_a') == star_name_a) & (pl.col('Name_b') == star_name_b)).select('Name_a', 'Name_b')
    #     if len(star_match) != 0:
    #         count_matches.append(str(star_match['Name_a'][0]) + " " + str(star_match['Name_b'][0]))
    # if len(count_matches) == len(valid_group):
    #     print("Identification is successful!")
    #     print(count_matches)
    # else:
    #     print("Identification failed")
    #     print(count_matches)


In [None]:
# Нахождение координат центра кадра и их верификация с помощью других навигационных звёзд на кадре

Mag_lim = 10.0
Coords_err = 1

Shift = True # Если известно, что есть сдвиг в изображении

# Нахождение центра кадра

main_star_ra, main_star_dec = find_star_coords(main_star, coords_catalog, sources_df) # main_star_x, main_star_y
center_ra, center_dec, wcs_center = find_frame_center(main_star_ra, main_star_dec, main_star_x, main_star_y)

if Shift:
    head = WCS(header=image.header)
    ra_image, dec_image = head.all_pix2world(Image_size / 2.0, Image_size / 2.0, 0)
    delta_ra = RA_true - ra_image
    delta_dec = Dec_true - dec_image
    RA_center = center_ra + delta_ra
    Dec_center = center_dec + delta_dec
    RA_err = abs(main_star_ra - RA_true) # RA_center
    Dec_err = abs(main_star_dec - Dec_true) # Dec_center
else:
    RA_center = center_ra
    Dec_center = center_dec

print(f"Found frame center: RA = {main_star_ra:.6f}°, Dec = {main_star_dec:.6f}°")
print(f"True coords: RA_true = {RA_true:.1f}°, Dec_true = {Dec_true:.1f}°")
print(f"Discrepancy: RA_err = {RA_err:.6f}°, Dec_err = {Dec_err:.6f}°")

# Верификация

if RA_err < Coords_err and Dec_err < Coords_err:
    ident_status = "Identification is successful!"
    print(ident_status)
    nav_stars = nav_stars_on_frame(coords_catalog, center_ra, center_dec, Rad, Mag_lim)
    K = len(nav_stars)
    nav_pix = w2p(nav_stars, center_ra, center_dec)
    nav_matches = kdtree_match(nav_pix, sources_df, tolerance=10)

    if len(nav_matches) > K/2.:
        valid_status = "Validation is successful!"
        ident_status = "Identification is successful!"
        print(ident_status)
        print(f"Found configuration of stars: {valid_group}")
    else:
        valid_status = "Validation failed"
        valid_flag = False
        print(valid_status)
else:
    ident_status = "Identification failed"
    print(ident_status)

In [None]:
# Загрузка файла с результатами

info_filename = "result.txt"
if not os.path.exists("algorithm_result"):
    os.makedirs("algorithm_result")
with open(os.path.join("algorithm_result", info_filename), 'w', encoding="utf-8") as f:
    f.write(ident_status + "\n")
    f.write(valid_status + "\n\n")
    f.write(f"Found frame center: RA = {RA_center:.6f}°, Dec = {Dec_center:.6f}°\n")
    f.write(f"True coordinates: RA_true = {RA_true:.6f}°, Dec_true = {Dec_true:.6f}°\n")
    f.write(f"Discrepancy: RA_err = {RA_err:.6f}°, Dec_err = {Dec_err:.6f}°\n")
    f.write(f"Found configuration of stars: {valid_group} (first star is common)\n") 
    f.write(f"Corresponding sources on the image: {valid_group_sources}\n")
    f.write(f"Number of triangles constructed: {num_built_triangles}\n")
    f.write(f"Distances scanned: {num_dist}\n")
    f.write(f"Matches of navigation stars on the frame: {len(nav_matches)}/{K}\n\n")
    f.write(f"Elapsed time per scan: {elapsed:.2f}s")
