In [12]:
### --- BasicReduction --- ###
### --- Owner: Barbara Joachimczyk --- ###
### ------------------------------------------------------------------------------------------------------ ###


from astropy.io import fits
from astropy.coordinates import EarthLocation, AltAz, SkyCoord
from astropy.time import Time
from astropy import units as u
from astropy.wcs import WCS
from astropy.visualization import ZScaleInterval
from astroquery.astrometry_net import AstrometryNet
from astroquery.astrometry_net import conf
import matplotlib.pyplot as plt 
import numpy as np 
from os import listdir, makedirs
from os.path import isfile, join, exists
from os import renames
import glob
import time
from itertools import chain







# ---  Zmienne globalne --- 

Observatory = EarthLocation(lat=53.093999*u.deg, lon=18.555925*u.deg, height=70*u.m)


### Klasa obiektów: Header. Przechowuje dane dotyczące headera. Uzupełnia się poprzez uruchomienie funkcji OpenHeader(), której argumentem jest ścieżka pliku, którego danymi z headera uzupełniany jest obiekt Header.

class Header:
    def __init__(self):
        self.imagetype = ''
        self.exp = 0
        self.temp = 0
        self.filter = ''
        self.bin = []
        self.subx = 0
        self.suby = 0
        self.bscale = 0
        self.bzero = 0
        self.history = ''
        self.jd = 0
        self.object = ''
        self.ra = 0
        self.dec = 0

    def OpenHeader(self, path):
        fits_file = fits.open(path)
        fits_header = fits_file[0].header
        self.imagetype = fits_header['IMAGETYP']
        self.exp = fits_header['EXPTIME']
        self.temp = fits_header['SET-TEMP']
        self.bin = str(fits_header['XBINNING']) + ' ' + str(fits_header['YBINNING']) 
        self.subx =  str(fits_header['XORGSUBF']) 
        self.suby =  str(fits_header['YORGSUBF'])
        self.bscale = fits_header['BSCALE']
        self.bzero = fits_header['BZERO']
        self.jd = fits_header['JD']
        self.object = fits_header['OBJECT']
        if 'FILTER' in fits_header:
            self.filter = fits_header['FILTER']

        fits_file.close()




### Klasa obiektów: Frame. Dziedziczy wartości Headera, uzupełniając je o nowe, potrzebne do stworzenia kompletnej ramki typu fits, która zapisuje poprzez uruchomienie funkcji SaveFitsFile().

class Frame(Header):
    def __init__(self, path):
        Header.__init__(self)
        self.path = path
        self.data = []
        self.history = ''
        self.name = '' 
        self.bitpix = 16


    def CalculateAirmass(self):
        ObservationTime = Time(self.jd, format='jd')           # czas obserwacji w formacie rozpoznawanym przez Astropy
        ObjectCoordinates = SkyCoord(self.ra, self.dec, unit=(u.hourangle, u.deg))     # współrzędne obiektu z Headera 

        ObjectAltAz = ObjectCoordinates.transform_to(AltAz(obstime=ObservationTime, location=Observatory))          # przekształcam współrzędne na horyzontalne

        Airmass = ObjectAltAz.secz                              # secant odległości zenitalnej, czyli airmass 

        return float(Airmass) 
    

    def SaveBDFFitsFrame(self):
        hdu = fits.PrimaryHDU(data = (self.data - self.bzero))
        hdull = fits.HDUList([hdu])
        save_header = hdull[0].header
        save_header['IMAGETYP'] = self.imagetype
        save_header['EXPTIME'] = self.exp
        save_header['SET_TEMP'] = self.temp 
        save_header['FILTER'] = self.filter
        save_header['BINNING'] = self.bin 
        save_header['SUBFRAME'] = str(self.subx) + ' ' + str(self.suby)
        save_header['BITPIX'] = self.bitpix
        save_header['BSCALE'] = self.bscale
        save_header['BZERO'] = self.bzero

        save_header['HISTORY'] = self.history
        hdull.writeto(join(self.path, self.name), overwrite=True)


    def SaveFitsFullHeader(self, headerPath):                         

        hdu = fits.PrimaryHDU(data = np.int16(self.data - self.bzero))
        hdull = fits.HDUList([hdu]) 


        with fits.open(headerPath) as openfits:
            save_header = openfits[0].header

        save_header['HISTORY'] = self.history
        save_header['RA'] = self.ra
        save_header['DEC'] = self.dec
        save_header['AIRMASS'] = self.CalculateAirmass()
        hdull[0].header = save_header


        hdull.writeto(join(self.path, self.name), overwrite=True)





### Klasa obiektów: Masters. Przechowuje obiekty typu Frame(), które są wyznaczonymi masterramkami w celu wykorzystania ich we właściwej redukcji ramek naukowych.

class Masters():
    def __init__(self):
        self.bias = None
        self.dark = []
        self.flat = []

    def GetDarkByExpTime(self, exp, bin, subx, suby):
        for dark in self.dark:
            if (dark.exp == exp and dark.bin == bin and dark.subx == subx and dark.suby == suby):
                return dark
        
        print(f'[ERROR] Dark not found. Failed to find one with params: Exp {exp}, Bin {bin}, Subx {subx}, Suby {suby}')  

    def GetFlatByFilter(self, filter, bin, subx, suby):
        for flat in self.flat:
            if (flat.filter == filter and flat.bin == bin and flat.subx == subx and flat.suby == suby):
                return flat
        return self.flat
        print('[ERROR] Flat not found.')          
        
        

#                     FUNKCJE                   

def SortBDFFiles(path):                   
    """
    Sortuje pliki w folderach na ramki naukowe oraz kalibracyjne z podziałem na długość ekspozycji i filtry. Tworzy odpowiednie masterfitsy.

    Parameters:
    - path: ścieżka do folderu danych obserwacyjnych

    Returns:
    - potwierdzenie posortowania plików fits.
    """   

    subfolders = listdir(path)
    for file in (subfolders):
        if file == 'bdf':                       # znajduje folder "bdf" i sortuje pliki według typu

            bdf_folder_path = join(path, "bdf")
            
            bdf_contains = listdir(bdf_folder_path)                 # tworzy listę plików w folderze bdf

            for i in range(len(bdf_contains)):                      # sprawdza header każdego pliku w celu dopasowania do odpowiedniego podfolderu
                fits_file = fits.open(join(bdf_folder_path, bdf_contains[i]))
                fits_header = fits_file[0].header
                fits_exptime = str(int(fits_header['EXPTIME']))

                if fits_header['IMAGETYP'] == 'Bias Frame':             # tworzenie podfolderu z biasami
                    fits_file.close()
                    
                    fits_file = renames(join(bdf_folder_path, bdf_contains[i]),join(bdf_folder_path, "Bias", bdf_contains[i]))

                elif fits_header['IMAGETYP'] == 'Dark Frame':           # tworzenie podfolderu z darkami z podziałem na czas ekspozycji
                    fits_file.close()

                    fits_file = renames(join(bdf_folder_path, bdf_contains[i]),join(bdf_folder_path, "Dark", 'Dark_' + fits_exptime, bdf_contains[i]) )

                elif fits_header['IMAGETYP'] == 'Light Frame' or 'Flat Frame':      # tworzenie podfolderu z flatami z podziałem na filtry
                    
                    try:
                        fits_filter = str(fits_header['FILTER'])
                        fits_file = renames(join(bdf_folder_path, bdf_contains[i]),join(bdf_folder_path, "Flat", 'Flat_' + fits_filter, bdf_contains[i]) )
                        fits_file.close()
                    except:
                        fits_file.close()
                        fits_file = renames(join(bdf_folder_path, bdf_contains[i]),join(bdf_folder_path, "Flat", 'Flat_', bdf_contains[i]) )


    print('BDF files sorted.')
    return 

def FitsFilesData(path):
    """
    Z plików fits w danym folderze tworzy macierz danych do wykonywania obliczeń.

    Parameters:
    - path: ścieżka do folderu danych obserwacyjnych

    """ 
    master_list = []
    for file_name in listdir(path):
        if file_name.endswith('.fits') or file_name.endswith('.fit'):
            full_path = join(path, file_name)
            with fits.open(full_path) as hdul:
                data = hdul[0].data
                # header = hdul[0].header
                # bzero = np.uint16(header['BZERO'])
                # data += bzero
                # print(bzero)
                master_list.append(data)
    master_list = np.array(master_list)


    return master_list


def CreateMasterFrames(path):
    """
    Przeszukuje folder z obserwacji po ramki kalibracyjne, z których tworzy masterbiasy, masterdarki i masterflaty.

    Parameters:
    - path: ścieżka do folderu danych obserwacyjnych

    Returns:
    - potwierdzenie stworzenia masterframes.
    """ 

    bias_path = join(path, 'bdf', "Bias")
    dark_path = join(path, 'bdf', 'Dark')
    flat_path = join(path, 'bdf', 'Flat')


    ### Tworzenie masterbias:

    bias_subpath = listdir(bias_path)
    mbias_frame = Frame(bias_path)
    mbias_frame.OpenHeader(join(bias_path, bias_subpath[-1]))                

    mbias_frame.data = np.median(FitsFilesData(bias_path), axis=0)
    mbias_frame.name = 'Masterbias.fits'
    mbias_frame.history = 'Masterbias calculated by median.'

    mbias_frame.SaveBDFFitsFrame()
    masterFrames.bias = mbias_frame
    

    print('Masterbias created.')
    ### Tworzenie masterdarków:

    dark_subpath = listdir(dark_path)               # lista folderów darków różnej długości
    
    for i in range(len(dark_subpath)):

        dark_sub_subpath = listdir(join(dark_path,dark_subpath[i])) # lista plików w podfolderze
        mdark_frame = Frame(join(dark_path, dark_subpath[i]))
        mdark_frame.OpenHeader(join(dark_path, dark_subpath[i], dark_sub_subpath[-1]))    # kradnę header ostatniego

        data_d = FitsFilesData(join(dark_path, dark_subpath[i]))            # array pikseli z pliku
        med_data_d = np.median(data_d, axis=0)
        med_data_d = med_data_d - masterFrames.bias.data    # odejmuję masterbias

        med_data_d[med_data_d < 0] =0                       # Wyzerowuję wartości ujemne; konieczne w przypadku darka o krótkim czasie ekspozycji, w którym pojawiły się ujemne zliczenia po odjęciu Bias




        mdark_frame.data = med_data_d                       # zapisuje do obieku Frame
        mdark_frame.name = 'Masterdark' + str(int(mdark_frame.exp)) + '.fits'
        mdark_frame.history = 'Dark calculated by median. Bias substracted.'


        mdark_frame.SaveBDFFitsFrame()
        masterFrames.dark.append(mdark_frame)

    print('Masterdark created.')

    ### Tworzenie masterflatów

    flat_subpath = listdir(flat_path)

    for i in range(len(flat_subpath)):

        flat_sub_subpath = listdir(join(flat_path,flat_subpath[i]))
        mflat_frame = Frame(join(flat_path, flat_subpath[i]))
        mflat_frame.OpenHeader(join(flat_path, flat_subpath[i], flat_sub_subpath[-1]))


        data_f = FitsFilesData(join(flat_path, flat_subpath[i]))
        data_f = data_f - masterFrames.bias.data
        data_f = data_f - masterFrames.GetDarkByExpTime(mflat_frame.exp, mflat_frame.bin, mflat_frame.subx, mflat_frame.suby).data      

        

        med_data_f = np.median(data_f, axis=0)
        
        norm_data_f = med_data_f / np.median(med_data_f)

        
        mflat_frame.data = norm_data_f
        mflat_frame.name = 'masterflat' + '_' + str(mflat_frame.filter) + '.fits'
        mflat_frame.history = 'Masterflat normalized. Bias and dark substracted.'

        mflat_frame.SaveBDFFitsFrame()
        masterFrames.flat.append(mflat_frame)

    print('Masterflat created.')

    return



# w tym miejscu mam posortowane pliki i zbudowane masterramki, które przechowuje obiekt masterFrames()






### REDUKCJA DANYCH Z RAMEK NAUKOWYCH 

def GetCoordsFromAstrometry(path):

    frames = listdir(path)

    conf.api_key = 'adwuagneiedgziwi'                   # ustawiam API (konto barbarajoachimczyk na Astrometry.net)

    Astrometry = AstrometryNet()


    Ra = 0                                              # inicjuję współrzędne, które wypełni Astrometry
    Dec = 0
    Counter = 0
    print('File sent to Astrometry.net: ', join(path,frames[0]))
    
    while Counter < 3:    
        try:
            wcs_header = Astrometry.solve_from_image(join(path,frames[0]), force_image_upload=True)
            break
        except Exception as e:
            print(f"Astrometry error: {e}")
            Counter += 1
        
    if Counter ==3:
        print("Astrometry Error: file not sent.")
        
    wcs = WCS(wcs_header)                          

    Ra = (wcs.wcs.crval[0] *24) / 360 
    Dec = wcs.wcs.crval[1]

    return Ra, Dec


def CalculateScienceFrames(path):

    objects = listdir(path)             # lista obiektów obserwowanych w nocy
    objects.remove('bdf')               # wyrzucam folder BDF bo to nie obiekt

    for object in objects:              # pętla dla folderów każdego obserwowanego obiektu
                                        # wkradam się z wyznaczeniem współrzędnych na podstawie jednego zdjęcia
        Coords = GetCoordsFromAstrometry(join(path,object))

        frames = listdir(join(path ,object))

        for frame in frames:
            if frame.endswith('.fit') or frame.endswith('.fits'):
                Reduction(path, object, frame, Coords)


            print('Out: ', frame)
    return



def Reduction(path, object, filename, Coords):                 # WERSJA Z TABLICY

            
            fits_frame = Frame(join(path, object, filename))
            fits_frame.OpenHeader(join(path, object, filename))

            with fits.open(join(path, object, filename)) as hdull:
                data = hdull[0].data
            
            masterbias = masterFrames.bias
            masterdark = masterFrames.GetDarkByExpTime(fits_frame.exp, fits_frame.bin, fits_frame.subx, fits_frame.suby)
            masterflat = masterFrames.GetFlatByFilter(fits_frame.filter, fits_frame.bin, fits_frame.subx, fits_frame.suby)

            data = data - masterbias.data
            data = data - masterdark.data
            data = data / masterflat.data 
            
            data[data > 65535] = 65535

            fits_frame.data = data 
            fits_frame.name = 'out_' + filename
            fits_frame.path = join(path, object, 'Pipeline_' + fits_frame.filter + '_' + str(int(fits_frame.exp)))
            fits_frame.history = 'Reduction: Dark -' + str(masterdark.exp) + '; Flat -' + str(masterflat.filter)
            fits_frame.ra = Coords[0]
            fits_frame.dec = Coords[1]

            if not exists(fits_frame.path):
                makedirs(fits_frame.path)


            fits_frame.SaveFitsFullHeader(join(path, object, filename))




masterFrames = Masters()  

#SortBDFFiles(r"D:\!STUDIA\Piwnice\!DANE\20220106\Przemek")
CreateMasterFrames(r"D:\!STUDIA\Piwnice\!DANE\20220106\Przemek")
CalculateScienceFrames(r"D:\!STUDIA\Piwnice\!DANE\20220106\Przemek")


        





Masterbias created.
Masterdark created.
[ERROR] Dark not found.


AttributeError: 'NoneType' object has no attribute 'data'