In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import sep
from astropy.io import fits
from astropy.wcs import WCS
import astropy.coordinates as coord
from astropy import units as u
from astropy.stats import SigmaClip
from astropy.visualization import ImageNormalize, ZScaleInterval
from astroquery.astrometry_net import AstrometryNet
from astroquery.simbad import Simbad
from photutils.aperture import CircularAperture, aperture_photometry, CircularAnnulus
from photutils.aperture import ApertureStats

import os
# from matplotlib import rc
# rc('font', **{'family': 'serif', 'serif': ['Computer Modern']})
# rc('text', usetex=True)
# plt.rcParams.update({'font.size':23})

In [None]:
def detect_stars(image_data, detection_threshold=5, min_area=5, aperture_radius=6, show=0):
    # Convert the image data to float32
    image_data = image_data.astype(np.float32)

    # Estimate the background and noise
    bkg = sep.Background(image_data)

    # Subtract the background
    image_data_sub = image_data - bkg.back()

    # Detect sources using SEP's source detection algorithm
    sources = sep.extract(image_data_sub, detection_threshold, err=bkg.globalrms)

    # Filter sources based on minimum area
    sources = sources[sources['npix'] >= min_area]

    if show:
        # Plot the image with detected sources using a colormap suitable for black-and-white printing
        fig, ax = plt.subplots(figsize=(8, 8))
        norm = ImageNormalize(image_data, interval=ZScaleInterval())
        im = ax.imshow(image_data, cmap='cubehelix', origin='lower', norm=norm)  # Changed colormap to 'cubehelix'

        # Plot circles around detected sources
        positions = np.transpose((sources['x'], sources['y']))
        apertures = CircularAperture(positions, r=aperture_radius)
        annulus_aperture = CircularAnnulus(positions, r_in=aperture_radius*2, r_out=aperture_radius*3)
        apertures.plot(ax=ax, color='white', lw=0.5)  # Plot the apertures with appropriate axis
        annulus_aperture.plot(ax=ax, color='red', lw=0.5)  # Explicitly provide `ax` to `plot`

        # Set title and axis labels
        ax.set_xlabel('X Position (Pixels)')
        ax.set_ylabel('Y Position (Pixels)')

        # Add a colorbar and set the label
        cbar = plt.colorbar(im, ax=ax)  # Explicitly provide `ax` to `colorbar`
        cbar.set_label('Brightness (Z-Scale Normalized)')

        plt.show()  # Display the plot

    return sources

def measure_flux(image_data, sources, aperture_radius=6):
    positions = np.transpose((sources['x'], sources['y']))
    apertures = CircularAperture(positions, r=aperture_radius)
    phot_table = aperture_photometry(image_data, apertures)
    annulus_aperture = CircularAnnulus(positions, r_in=aperture_radius*2, r_out=aperture_radius*3)
    sigclip = SigmaClip(sigma=3.0, maxiters=10)
    aper_stats = ApertureStats(image_data, apertures, sigma_clip=None)
    bkg_stats = ApertureStats(image_data, annulus_aperture, sigma_clip=sigclip)
    total_bkg = bkg_stats.median * aper_stats.sum_aper_area.value
    apersum_bkgsub = aper_stats.sum - total_bkg
    phot_table['aperture_sum_bkgsub'] = apersum_bkgsub

    return phot_table

def astrometry(phot_table, hdul, exposure_t):
    # Initialize Astrometry.net
    ast = AstrometryNet()
    ast.api_key = ''

    # Upload your image to Astrometry.net and solve it
    wcs_header = hdul[0].header

    # Create a WCS object from the header
    wcs = WCS(wcs_header)

    ra_values, dec_values = wcs.all_pix2world(phot_table['xcenter'], phot_table['ycenter'], 0)
    coordinates = coord.SkyCoord(ra=ra_values * u.deg, dec=dec_values * u.deg, frame='fk5')
    
    # Create a DataFrame with coordinates and aperture_sum_bkgsub
    result_df = pd.DataFrame({
        'Coordinates': coordinates,
        'flux': phot_table['aperture_sum_bkgsub']
    })
    
    threshold = 4 * (2**12) * 0.9
    filtered_df = result_df[result_df['flux'] * exposure_t <= threshold]

    return filtered_df

def identify_stars(coords_flux_df_p, band, type_filters):
    flux_str_field = "flux(R)"
    flux_str_df = "FLUX_R"
    flux_str_field2 = "flux(G)"
    flux_str_df2 = "FLUX_G"
    flux_str_field3 = "flux(B)"
    flux_str_df3 = "FLUX_B"
    flux_str_field4 = "flux(V)"
    flux_str_df4 = "FLUX_V"
    Simbad.add_votable_fields(flux_str_field)
    Simbad.add_votable_fields(flux_str_field2)
    Simbad.add_votable_fields(flux_str_field3)
    Simbad.add_votable_fields(flux_str_field4)
    results = pd.DataFrame()

    for _, row in coords_flux_df_p.iterrows():
        coor = row[0]
        flux = row[1]
        r = 2.5 * u.arcminute

        try:
            result_table = Simbad.query_region(coor, radius=r)

            if result_table:
                coords = coord.SkyCoord(ra=result_table['RA'], dec=result_table['DEC'], unit=(u.hourangle, u.deg))
                ra_degrees = coords.ra.degree
                dec_degrees = coords.dec.degree
                result_table['RA_deg'] = ra_degrees
                result_table['DEC_deg'] = dec_degrees
                result_df = result_table.to_pandas()

                if result_df[flux_str_df].notna().any():
                    selected_star = result_df[result_df[flux_str_df].notnull()]  # Filter out null values
                    selected_star = selected_star[selected_star[flux_str_df] == selected_star[flux_str_df].min()]  # Find the brightest star in G band
                
                else:
                    all_coords = coord.SkyCoord(
                        ra=result_df['RA_deg'],
                        dec=result_df['DEC_deg'],
                        unit=(u.deg, u.deg),
                        frame='icrs'
                    )
                    separations = coor.separation(all_coords)
                    closest_idx = separations.argmin()
                    selected_star = result_df.iloc[[closest_idx]]

                if selected_star.empty:
                    continue

                df = pd.DataFrame({
                    'Object': selected_star['MAIN_ID'],
                    'RA_result (degree)': selected_star['RA_deg'],
                    'DEC_result (degree)': selected_star['DEC_deg'],
                    'RA_input (degree)': coor.ra,
                    'DEC_input (degree)': coor.dec,
                    'Apparent magnitude R (known)': selected_star[flux_str_df],
                    'Apparent magnitude G (known)': selected_star[flux_str_df2],
                    'Apparent magnitude B (known)': selected_star[flux_str_df3],
                    'Apparent magnitude V (known)': selected_star[flux_str_df4],
                    'Flux {} (measured, W/m^2)'.format(band): flux
                })
                results = results.append(df, ignore_index=True)

            # else:
            #     print("No results found for coordinates:", coor)

        except Exception as e:
            pass
            # print("An error occurred:", e)

    Simbad.remove_votable_fields(flux_str_field)
    Simbad.remove_votable_fields(flux_str_field2)
    Simbad.remove_votable_fields(flux_str_field3)
    Simbad.remove_votable_fields(flux_str_field4)
    coords_input = coord.SkyCoord(ra=results['RA_input (degree)'], dec=results['DEC_input (degree)'], unit=(u.deg, u.deg), frame='icrs')
    # ra_deg = coord.Angle(np.array(results['RA_result']), unit=u.hourangle)
    coords_result = coord.SkyCoord(ra=results['RA_result (degree)'], dec=results['DEC_result (degree)'], unit=(u.deg, u.deg), frame='icrs')
    # Calculate the angular separation between input and result coordinates
    angular_separation = coords_input.separation(coords_result).degree
    # Add angular separation to DataFrame
    results['Angular Separation (degree)'] = angular_separation
    results_tot = results.loc[results.groupby('Object')['Angular Separation (degree)'].idxmin()].reset_index(drop=True)
    
    return results_tot

def calculate_magnitudes(results_p, band):
    
    return -2.5*np.log10(results_p['Flux {} (measured, W/m^2)'.format(band)])

In [None]:
def magnitudes_file(filename, band, exposure_t, type_filters, detection_threshold=5, min_area=5, aperture_radius=6, show=0):
    # Load the FITS file
    hdulist = fits.open(filename)
    # print(repr(hdulist[0].header))
    image_data = hdulist[0].data  # assuming the image data is in the primary HDU
    # Detect stars in the image
    detected_stars = detect_stars(image_data, detection_threshold, min_area, aperture_radius, show)
    # Measure flux of detected stars
    phot_table = measure_flux(image_data, detected_stars, aperture_radius)
    # Get coordinates of stars
    coords_flux_df = astrometry(phot_table, hdulist, exposure_t)
    # Match coordinates with star names from Simbad database
    results = identify_stars(coords_flux_df, band, type_filters)
    # Calculate apparent magnitudes
    results['Apparent magnitude {} (instrumental)'.format(band)] = calculate_magnitudes(results, band)
    results['Airmass'] = hdulist[0].header['AIRMASS']
    results['Date'] = hdulist[0].header['DATE-OBS']
    results['Type'] = type_filters
    
    return results

In [None]:
results = pd.DataFrame()
folder_path = 'Data/March_2024_RC_ZWO_RGB_JC_UBVR_sky1'

# Iterate over folders and subfolders
for root, dirs, files in os.walk(folder_path):
    for filename in files:
        file_path = os.path.join(root, filename)

        if os.path.isfile(file_path):
            print(filename)
            filename_split = filename.split('_')
            exposure_t = filename_split[4]
            band = filename_split[3][0]
            type_filters = filename_split[2]
            
            # Check if the file exists before attempting to read it
            if os.path.exists(file_path):
                exposure_t = int(exposure_t)

                try:
                    result = magnitudes_file(file_path, band, exposure_t, type_filters)
                    results = results.append(result, ignore_index=True)

                except Exception as e:
                    pass

            else:
                print(f"File '{filename}' not found.")
                
        else:
            print(f"'{filename}' is not a file.")

In [None]:
final_result = results.reset_index()
desired_columns_order = ['Object', 'Type', 'Date', 'RA_result (degree)', 'DEC_result (degree)', 'RA_input (degree)', 'DEC_input (degree)', 'Angular Separation (degree)', 'Apparent magnitude R (known)', 'Apparent magnitude G (known)', 'Apparent magnitude B (known)', 'Apparent magnitude V (known)', 'flux R (measured, W/m^2)', 'flux G (measured, W/m^2)', 'flux B (measured, W/m^2)', 'flux V (measured, W/m^2)', 'Apparent magnitude R (instrumental)', 'Apparent magnitude G (instrumental)', 'Apparent magnitude B (instrumental)', 'Apparent magnitude V (instrumental)', 'Airmass']
final_result = results.reindex(columns=desired_columns_order)
final_result.to_csv('Results/result_sky1.csv', index=False)