In [None]:


import os, sys, math, glob, csv
#!pip install pyplate

from astropy.io import fits
from astropy.wcs import WCS
import astropy.units as u
from astropy.coordinates import SkyCoord, FK4, FK5, Angle
from astropy.time import Time
import astropy.coordinates as acoord

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import itertools

from scipy import ndimage
import shutil


from google.colab import drive
drive.mount('/content/gdrive', force_remount=False)
%cd /content/gdrive/MyDrive/AAO_PACE/
 # 'MyDrive' is root of your google drive


[0m[01;34mBadData[0m/                                                     plate177.fits
New_Master_Metadata.csv                                      plate182.fits
plate103.fits                                                plate189.fits
plate103_nocoords_fast_grid_removed_fast_grid_removed.fitss  plate192.fits
plate103_nocoords.fits                                       plate196.fits
plate106.fits                                                plate199.fits
plate121.fits                                                plate200.fits
plate142.fits                                                plate205.fits
plate147.fits                                                plate210.fits
plate158.fits                                                plate218.fits
plate161.fits                                                plate230.fits
plate166.fits                                                plate237.fits
plate170.fits                                                plate251.fits
plate173.

In [None]:
## IF NOT RUNNING LOCALLY
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)

Mounted at /content/gdrive


In [None]:
# = = = = =
# MASTER CODE FOR FINAL PRODUCT
# = = = = =

def Update_Perth_Plate_Fits(input_folder_path = '/content/gdrive/MyDrive/AAO_PACE/', logfile = True):


  def WCSFitsUpdate(loadfits = input_folder_path):

    def var_cleanup():

      local_vars = ('head','fitsimport','ObsTime','ObsJD','ExpTime','HeadEquinox','missingdata', 'obsRA', 'obsDEC', 'raHead', 'decHead')
      for i in local_vars:
        if i in locals():
          del locals()[i]
      # clear out some vars to avoid errors in next code run
      return

    def NoGo_badData(fitsfile):


        badcsv = 'FailedFits.csv'

        print(f'{fitsfile} missing data, name appended to {badcsv}')
        if not os.path.exists(badcsv):    #create csv if doesnt exist
          with open(badcsv, 'w', newline='') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(['Files with Missing/broken Data'])

        with open(badcsv, 'a', newline='') as csvfile:
          writer = csv.writer(csvfile)  # append line
          writer.writerow([fitsfile])

        return

    def addToLog(string):
      # Generate Logfile
      if logfile == True:
        log = f'logfile.txt'
        # if not os.path.exists(log):    #create csv if doesnt exist -=- open already creates file if none exists
        with open(log, 'a') as logg:
          logg.write(f'{string}\n')
          logg.flush()  # it'd be impolite not to
      return

    if not os.path.exists('logfile.txt'):
      addToLog('##  Australian Astronomical Optics  Perth Observatory  ##')
      addToLog('##        Photographic Plates Ingestion Script         ##')
      addToLog('  MQ PACE Placement 2025')
      addToLog('  nuwanthika.fernando@mq.edu.au,  aidan.connaughton@mq.edu.au , matthew.lee20@students.mq.edu.au ')


    # # # # # # # # #
    # Actually running the code now!
    # # # # # # # # #

    WCS_EXISTS = 0
    missingdata = 0

    head = []

    pathdir, fitsfile = os.path.split(loadfits) # split into   dirdirdir /, file

    base, ext = os.path.splitext(fitsfile)       # split into filename , .extension

    addToLog('\n# # # # # #\nSTARTING  {fitsfile}\n# # # # #')
    fitsimport = fits.open(loadfits, ignore_missing_simple=True)



    if fitsimport[0].header[0] != True and fitsimport[0].header[0] != 'T':
      print('No SIMPLE header in PrimaryHDU (main image)')
      addToLog('No SIMPLE Keyword found in 1st Header!')
      head = fitsimport[1].header
      if fitsimport[1].header[0] != True and fitsimport[1].header[0] != 'T':
        print('No SIMPLE header in ImageHDU (envelope)')
        missingdata = 1
        NoGo_badData(fitsfile)
        addToLog('No SIMPLE Keyword found in either 2nd header!')
      return
    else:
      head = fitsimport[0].header

    if "RA" not in head or "DEC" not in head:
      addToLog('RA or DEC missing from header!')
      missingdata = 1
    else:
      ObsRA       = head["RA"]
      ObsDEC      = head["DEC"]
      if (ObsRA < 0) or (ObsRA > 24):
       addToLog('RA value is bad! (below 0, o r above 24 hr)')
       missingdata = 1

      if (ObsDEC < -90) or (ObsDEC > 90):
       addToLog('DEC value is bad! (below -90, or above +90 deg)')
       missingdata = 1


    if "JD" in head:
      ObsJD = head["JD"]
    else:
        if "DATE-OBS" in head:
          # ObsJD = head["DATE-OBS"].time.format = 'jd'   #if no JD, set the date to JD and use that instead... -- Threw an error so I added dateutils.parse
          dt = parse(head['DATE-OBS']);
          ObsJD = Time(dt).jd;
          addToLog('DATE-OBS used in place of JD')
        else:
          missingdata = 1
          addToLog('No JD or DATE-OBS found!')

    if (ObsJD < 2411368.5  )  or  (ObsJD > 2422324.5):  #
        missingdata = 1
        addToLog('JD outside valid range ( 1/1/1890 < JD < 1/1/1920) ')


    if 'CRPIX'  in head:
      addToLog('CRPIX found in header, WCS already exists, not generating WCS!')
      ###
      WCS_EXISTS = 1
      ###
    else:
      WCS_EXISTS = 0


    if missingdata == 1:  # last check before copying and starting
      addToLog('###\n File Aborted Early, MissingData flag triggered!\n###')
      NoGo_badData(fitsfile)
      var_cleanup()
      return 'BAD_DATA'


    if missingdata == 0:
      fitsimport[0].header = head
      fitsimport.close()

      # Create the new file now!
      os.makedirs('updated_fits', exist_ok=True)
      new_name = f"updated_fits/{base}_copy{ext}"            # append "_copy"
      shutil.copy(loadfits, new_name)              # create copy with appended text
      workableFits = fits.open(new_name, ignore_missing_simple=True)
      if workableFits[0].header[0] == True or workableFits[0].header[0] == 'T':
          head = workableFits[0].header
      elif workableFits[0].header[1] == True or workableFits[0].header[1] == 'T':
          head = workableFits[1].header
      else:
          print('Error creating new fits for',new_name)
          return 'BAD_DATA'


      # Convert RA/DEC to ICRS
      # J2000 -> ICRS   Follows FK4NoETerm under SOFA
      ObsRA = ObsRA *u.hourangle
      ObsDEC = ObsDEC*u.deg
      ObstimeCalc = Time(ObsJD, format = 'jd', scale = 'tt')
      cJD = SkyCoord(ra=ObsRA, dec=ObsDEC, frame=FK4(equinox='B1950.0', obstime = ObstimeCalc))
      c_ICRS = cJD.transform_to('icrs')
      # Set new values
      raHead = Angle(c_ICRS.ra, unit = u.hourangle).deg
      decHead = Angle(c_ICRS.dec, unit = u.deg).deg  # take to 2 sig fig!!!!!!
      ################
      addToLog('RA/DEC Looks good! \n')
      addToLog(f'Original values (dec hr, dec deg): {ObsRA}, {ObsDEC}')
      addToLog(f'O. values (sexg. hr, sexg. deg)  : {cJD.ra.to_string(unit = u.hourangle, sep=":" )}, {cJD.dec.to_string(unit = u.deg, sep=":" )} \n')
      addToLog(f'ICRS: {c_ICRS.ra.to_string(unit=u.hourangle, sep=":", precision=2)}, {c_ICRS.dec.to_string(unit=u.deg, sep=":", precision=2)} ')

      #Check existing RA/DEC/WCS wrt image metadate (eg. already solved image)
      # Check if +/- 2.5
      #if head["RA"]:


    # # # # # # #
    # WCS Time! #
    # # # # # # #

    # if no wcs exists (allow existing data to be entered)                                #### #### #### #### ####




    def AddConvWCS(head, CRPIX = [ head['NAXIS1'] /2 , head['NAXIS2'] /2 ] ,RADESYS = "ICRS",CTYPE = ["RA---TAN", "DEC--TAN"],CRVAL = [raHead, decHead],RA = raHead,DEC = decHead):
          # RA/DEC_NEW AS WELL, FOR THE CRVAL, LEAVE ORIG RA/DEC KEYWORDS
          # head var must be active and open
          # WCS fallback for unsolved image/ no wcs already present

          # TODO:
          #       Allow image with existing 'solved' WCS data to be accept and not overwritten!
        w = WCS(naxis=2)

        if missingdata == 0:
          w = WCS(naxis=2)

          # set image centre
          w.wcs.crpix = CRPIX       #[ head['NAXIS1'] /2 , head['NAXIS2'] /2 ]
          w.wcs.radesys = RADESYS   #"ICRS"
          w.wcs.ctype = CTYPE       #["RA---TAN", "DEC--TAN"]    # add comment

          w.wcs.crval = [RA,DEC]    #[raHead, decHead]

          '''
          CD matrix - CROT deprecated, rotation now integrated to distortion matrix
          [-pxscale, 0]       RA typically increases to left of frame ->  -ve pxscale
          [0 , pxscale]
          create comment to address potential image flip
          CDELT is translation, PC is rotation
          '''
          #if not head['CD']:   'doesnt exist in header - yeah thats the point...
          pxScale =  0.000175#162   deg/px  0.63 "/px          plate106 solved to (CD1_1	0.000174758115388) (CD2_2	0.00017556584632)
          w.wcs.cd = np.array([[pxScale, 0 ], # array elements are unknown-signed, plate orientation cannot be assumed for all
                                  [0, pxScale ]])  # fits comment???
            #w.wcs.cd = CD

          wcs_header = w.to_header()
          head.update(wcs_header)

          del head['MJDREF']    # mjdref = 0, no offset being applied, just remove it to save confusion

          head.set('hierarch AAO_COMMENT_PX1', 'PC[N_N] VALUES UNSIGNED, NO CONSISTENT SCAN ORIENTATION')
          head.set('hierarch AAO_COMMENT_PX2', 'PXSCALE ~0.631 "/PX FROM ASTROMETRY.NET SOLVES')

        return head


    if WCS_EXISTS == 0:
      head = AddConvWCS(head)
      if "SIMPLE" in head:
        head["SIMPLE"] = True
        addToLog('SIMPLE keyword found, set to True')
    ########################################################################################  SET UP INPUT FOR EXISTING WCS

    # # #
    # WCS Time is now over.
    # # #
    timenow = Time.now().to_string()
    head.set('hierarch AAO_COMMENT1', 'b19xx PRECESSED VIA ASTROPY TO ICRS')
    head.set('hierarch AAO_COMMENT2', 'AAO DATACENTRAL PRE-INGESTION, PROCESSED:')
    head.set('hierarch AAO_COMMENT3', f'{timenow} UTC')

    workableFits[0].header = head
    workableFits.writeto(f'{new_name}' ,overwrite = True)

    addToLog(f'{base}_copy generated successfully!\n @ {timenow} UTC' )
    addToLog(f'Output Header: \n{repr(head)}')

    workableFits.close()
    var_cleanup()



    return new_name # output new fits filepath to be added to metadata csv



  ###-=-=-=-=-=-=-=-=-=-
  ### METADATA SECTION
  ###-=-=-=-=-=-=-=-=-=-



  def error_check(var):
    ''' Check if a variable has been set to ERROR as an escape so code doesn't crash,
    prints the error and returns True if an error occured '''

    if var[:14] == 'METADATA ERROR':
      print(var)
      return True
    else:
      return False

  def check_dupes(lst):
    seen = []
    dupes = []
    dupe_counts = []
    dupes_total = []
    dupe_indexs = []
    for j,x in enumerate(lst):
      if x in seen:
        dupes_total.append(x)
        dupe_indexs.append(j)

        if x not in dupes:
          dupes.append(x)
          dupe_counts.append(1)
        for i,dupe in enumerate(dupes):
          if x == dupe:
            dupe_counts[i] += 1
      else:
        seen.append(x)
    return dupes, dupe_counts, dupes_total, dupe_indexs



  def unpack_fits(fits):
    ''' Extracts the headers and data from an input fits file '''

    # Make sure a fits file is input so the code doesn't crash        #   .fit files are a thing that exist, genernally not the ""standard"" but is common enough
    if fits[-5:] != '.fits':
      return 'METADATA ERROR - Please input a valid .fits file'

    # Use pyplate to extract the values from the fits
    return pp.metadata.PlateHeader.from_fits(fits)



  def unpack_csv_headers(file):
    ''' Extracts the headers from an input csv file '''

    # Make sure a csv file is input so the code doesn't crash
    if file[-4:] != '.csv':
      return 'METADATA ERROR - Please input a valid .csv file'

    # Extract the first row of the csv to get the headers
    with open(file, newline='') as csvfile:
      return list(csv.reader(csvfile))[0]



  def create_master_from_fits(fits):
    # Use an example fits to start a master_file
    fits_total = unpack_fits(fits)
    heads = list(fits_total.keys())
    heads.insert(0,'file_name')
    masterdf = pd.DataFrame(columns=heads)
    masterdf.to_csv('New_Master_Metadata.csv',index=False)
    return



  def add_header(header_name, header_list):
    ''' Adds a header of input name at the end of an input header list '''

    header_list.append(header_name)



  def add_fits_to_master(fits,master_file):

    ## ADD CURRENT MASTER HEADERS

    master_headers = unpack_csv_headers(master_file)
    if error_check(master_headers):
      return


    ## ADD FITS

    fits_total = unpack_fits(fits)
    if error_check(fits_total):
      return


    ## ADD FITS HEADERS

    cur_heads = list(fits_total.keys())


    ## CHECK IF HEADERS NEED TO BE ADDED TO MASTER

    # Make sure duplicates are the same
    cur_dupes, cur_dupe_counts, cur_dupe_totals, cur_dupe_indexs = check_dupes(cur_heads)
    master_dupes, master_dupe_counts , master_dupe_totals, master_dupe_indexs = check_dupes(master_headers)

    for i,dupe in enumerate(cur_dupes):
      if dupe not in master_dupes:
        add_header(dupe,master_headers)
      else:
        if cur_dupe_counts[i] > master_dupe_counts[i]:
          for _ in range(cur_dupe_counts[i] - master_dupe_counts[i]):
            add_header(dupe,master_headers)

    # Add any new headers to master
    added_headers=[]
    for i,name in enumerate(cur_heads):
      if name not in master_headers:
        master_headers.append(name)
        added_headers.append([i,name])

    # Set new total dupes so data can be added later
    new_master_dupes, new_master_dupe_counts, new_master_dupe_totals, new_master_dupe_indexs = check_dupes(master_headers)


    ## ADD FITS DATA

    cur_data = list(fits_total.values())


    ## ADD DATA TO MASTER

    # Create a list the same length as the master so it can add correctly and any missing data is nil
    input_data = ['nil'] * len(master_headers)

    # Add the name of the plate to the first index of the metadata
    input_data[0] = os.path.splitext(os.path.split(fits)[1])[0]

    # Go through each header in master to add the corresponding value - without duplicates
    for i,name in enumerate(master_headers):
      if name in cur_heads and name not in cur_dupes:
        input_data[i+1] = fits_total[name]

    # Go back through and add duplicate datas to corresponding
    for i,name in enumerate(cur_dupe_totals):
      for j,typ in enumerate(new_master_dupe_totals):
        if name == typ and input_data[new_master_dupe_indexs[j]] == 'nil':
          input_data[new_master_dupe_indexs[j]] = cur_data[cur_dupe_indexs[i]]

    # Append the input data to the master data
    with open(master_file, newline='') as csvfile:
      out_csv = list(csv.reader(csvfile))

    out_csv.append(input_data)

    out_df = pd.DataFrame(out_csv[1:],columns=master_headers)
    out_df.to_csv('New_Master_Metadata.csv',index=False)
    print('Sucessfully added',fits,'to metadata')

  def find_and_wcs_fits():
    fitslist = glob.glob('**/*.fit*', recursive = True,)

    created_master = False
    for i,found_path in enumerate(fitslist):
      pathdir, fitsfile = os.path.split(found_path)
      if fitsfile[:5] == 'plate' and pathdir != 'updated_fits' and pathdir != 'BadData':
        print('')
        print('','Found:',found_path)
        updated_file = WCSFitsUpdate(found_path)
        print('')
        if updated_file != 'BAD_DATA':
          if not created_master:
            create_master_from_fits(updated_file)
            created_master = True
          add_fits_to_master(updated_file,'New_Master_Metadata.csv')

    return


  ###
  # End of function setup
  ###


  print('Checking and locally installing required packages...')
  print('')

  import os, sys, math, glob, csv;
  !pip install pyplate -q;
  import pyplate as pp;


  from astropy.io import fits;
  from astropy.wcs import WCS;
  import astropy.units as u;
  from astropy.coordinates import SkyCoord, FK4, FK5, Angle;
  from astropy.time import Time;
  import astropy.coordinates as acoord;
  from dateutil.parser import parse

  import numpy as np;
  import matplotlib.pyplot as plt;
  import pandas as pd;
  import itertools;

  from scipy import ndimage;
  import shutil;

  print('All packages correct ✓')
  print('Locating plates folder...')
  print('')

  os.chdir(input_folder_path)

  print('Working in:', os.getcwd())
  print('')

  print('Obtaining all plate fits')

  find_and_wcs_fits()

  print('\nAll fits conversions done :)')


  #! pip install alive-progress


In [None]:
## Testing master code
Update_Perth_Plate_Fits('/content/gdrive/MyDrive/AAO_PACE/original_fits')

Checking and locally installing required packages...

All packages correct ✓
Locating plates folder...

Working in: /content/gdrive/.shortcut-targets-by-id/1mBGDQpFuenHzfjpH-i8R8SP0vQoDT91T/AAO_PACE/original_fits

Obtaining all plate fits

 Found: plate142.fits

Sucessfully added updated_fits/plate142_copy.fits to metadata

 Found: plate147.fits

Sucessfully added updated_fits/plate147_copy.fits to metadata

 Found: plate121.fits

Sucessfully added updated_fits/plate121_copy.fits to metadata

 Found: plate103.fits

Sucessfully added updated_fits/plate103_copy.fits to metadata

 Found: plate158.fits

Sucessfully added updated_fits/plate158_copy.fits to metadata

 Found: plate106.fits

Sucessfully added updated_fits/plate106_copy.fits to metadata

 Found: plate196.fits

Sucessfully added updated_fits/plate196_copy.fits to metadata

 Found: plate200.fits

Sucessfully added updated_fits/plate200_copy.fits to metadata

 Found: plate182.fits

Sucessfully added updated_fits/plate182_copy.fits

In [None]:
import astropy
print(astropy._citation_)

AttributeError: module 'astropy' has no attribute '_citation_'