In [1]:
try: 
    import toolkit
    import numpy as np
    from astropy.io import fits
    import matplotlib.pyplot as plt

except ImportError:
    import sys
    !{sys.executable} -m pip install --user --upgrade matplotlib pandas astropy numpy shutil pathlib tkinter tqdm
    print('Restart you kernel and try again')

# Overview

General process outline 
1. Chose a parent data directory
2. Run the `processing()` function. This will do the following things
    1. This starts by sorting files what have the `"Repeat"` keyword based on their directory. This should also just go ahead and sort the files based on their name into sub directories but I have not done that yet.
    2. Next, the sorted fits files are loaded into `FitsLoader` type objects that allows them to be averaged and saved together. 
3. To average and save the files you will want to access the following methods 
    - `liquid_data["energy"].merge_data()`: This computes the average bright and dark image
    - `liquid_data["energy"].write_merged_file()`: This writes a destination file based on the merged image

## Notes and further work
Further work needs to be done to fix three important issues 
- Currently, the fits object is generated with all of the data so long as the file name has 'Repeat' in it and it shares the same energy. This needs to be updated so that separate fits objects are constructed for each sample in a given energy. 
    - This could be updated by simply altering the `processing()` function such that the dictionary keys are changed from `energy.name` to `sample_name + energy.name` where `sample_name` can be pulled by something like `file.name.split("Repeat")[0]`.
- The mask should be based on `dark = False`, `bright = True`. Right now this is not the case.
    - A simple fix would be to force every mask into a structure of `mask = [False, True, ... , True, False]` of a desired length.
- For some dumb reason the mask itself is not applying to the self.images array... this should not happen.
    - A possible solution would be to force the the images out of being a numpy array and into a list of numpy arrays. This mixed object type is a 1d sequence that can be sliced with a working mask.

In [2]:
# toy_fits = toolkit.open_dialog()
# header = fits.open(toy_fits)[0].header
# del header['COMMENT']
# meta = {item: header[item] for item in header}
# import json 

# with open('test2.json', 'w') as fp:
#     json.dump(meta, fp, indent=4)

In [3]:
directory = toolkit.file_dialog()
liquid_data = toolkit.processing(directory)

AttributeError: 'list' object has no attribute 'with_suffix'

In [6]:
liquid_data['270.0'].merge_data
liquid_data['270.0'].write_merged_file

<bound method FitsLoader.merge_data of <toolkit.FitsLoader object at 0x0000029AB76CA6E0>>

In [4]:
%%writefile toolkit.py
import copy
import glob
import os
from typing import Union
import numpy as np
from shutil import copy2
from astropy.io import fits

from pathlib import Path
from tkinter import filedialog
from tkinter import *
from tqdm.auto import tqdm

#
# Helpful functions
#


def file_dialog():
    root = Tk()
    root.withdraw()
    directory = Path(filedialog.askdirectory())
    return directory

def open_dialog():
    root = Tk()
    root.withdraw()
    file_save_path = Path(filedialog.askopenfilename())
    return file_save_path if file_save_path else None

'''
Added near the end. Need to go back though and remove redundant fits reading 
    "
    with fits.open(file) as header:
        ...
    "
'''

class FitsLoader:
    '''
    A clone of the xrr fits loader. This loads a fits file unpacking the header 
    '''
    def __init__(self, directory: Path):
        self.directory = directory
        self.images = []
        self.energy = []
        self._shutter = []

        self._read_files()
        self.bright_dark_mask = np.array([not bool(status) for status in self._shutter])
        self.merge_data()


    def _read_files(self):
        self.file_list = sorted(glob.glob(os.path.join(self.directory, "*.fits")))
        self.scan_name = self.file_list[0].split("\\")[-1].split("-")[0]
        
        arrays = [
            [
                fits.getheader(f, 0)["Beamline Energy"],
                fits.getheader(f, 0)["CCD Camera Shutter Inhibit"]
            ]
            for f in self.file_list
        ]
        self.energies, self._shutter = np.column_stack(arrays)

        self.image_data = np.squeeze(
            np.array([[fits.getdata(f, 2) for f in self.file_list]])
        ).astype(np.uint16)


    def merge_data(self):
        '''Uses the boolean shutter status to mask the images data and average'''
        bright_images = self.images[self.bright_dark_mask]
        dark_images = self.images[np.invert(self.bright_dark_mask)]

        self.averaged_bright = bright_images.mean(axis = 0, dtype = np.uint16)
        self.averaged_dark = bright_images.mean(axis = 0, dtype = np.uint16)

    def write_merged_file(self):
        dark_fits = copy.deepcopy(fits.open(self.file_list[0]))
        bright_fits = copy.deepcopy(fits.open(self.file_list[1]))

        dark_out_file = self.file_list[0].slice('-')[:-2] + 'Dark_Average'
        bright_out_file = self.file_list[0].slice('-')[:-2] + 'Bright_Average'

        dark_fits[2].data = self.averaged_dark
        bright_fits[2].data = self.averaged_bright

        dark_fits.writeto(dark_out_file, overwrite = True)
        bright_fits.writeto(bright_out_file, overwrite = True)



#
# Basic Fits File Sorting
#

def check_parent(dir: Path) -> None:
    """
    Makes a new directory for the sorted data

    Parameters
    ----------
    dir : pathlib.Path
        Directory of the data that you want sorted
    """
    p_dir = dir.parent
    sample_list = list(p_dir.glob("*txt"))
    Directories = [x[0] for x in os.walk(p_dir)]

    sort_path = p_dir / "Sorted"
    sample_path = sort_path / sample_list
    
    if not sort_path.exists():
        sort_path.mkdir()
    else:
        print(
            "The sorted directory already exists - Checking for energy sub-directories"
        )
    
    for sample in sample_path:
        if not sample.exists():
            sample.mkdir()
        else:
            print(
                "The sorted directory already exists - Checking for energy sub-directories"
            )

    return




def file_filter(fits_files: list, filter = 'Repeat') -> list:
    '''
    A bad method of filtering the fits files. Should implement with filter() but it doesn't matter

    Parameters
    ----------
    fits_files : list
        list of fits files
    filter : str, optional
        indicator string to start filtering, by default 'Repeat'

    Returns
    -------
    list
        list of files with the filter indicator
    '''
    return [fits_file for fits_file in fits_files if fits_file.name.find(filter) != -1]


def energy_sorter(files: list, sort_dir: Path) -> None:
    '''
    Energy sorter

    Parameters
    ----------
    files : list
        List of files that will be sorted by energy
    sort_dir : Path
        destination directory that the files will be sorted into
    '''
    for i, file in tqdm(enumerate(files)):
        with fits.open(file) as headers:
            new_en = round(headers[0].header[49], 1)
    
        dest = sort_dir / str(new_en)
    
        if not dest.exists():
            dest.mkdir()
    
        copy2(file, dest)


def liquid_sorter(directory: Path, filter = 'Repeat') -> None:
    """
    Collects the energies each fits was collected at and makes subfolder for each energy
    Generates a dictionary containing the current file location, and its destination.

    Parameters
    ----------
    dir : pathlib.Path
        Directory of the data that you want sorted
    """

    assert directory.name == 'CCD'
    check_parent(directory)

    fits_files = list(directory.glob("*fits"))
    repeat_files = file_filter(fits_files, filter = 'Repeat')
    sort_dir = directory.parent / "Sorted"

    energy_sorter(repeat_files, sort_dir)

    return


def processing(directory: Path, filter = 'Repeat') -> dict:
    liquid_data = {}
    liquid_sorter(directory,filter = filter)

    sorted_path = directory.parent / 'Sorted'
    energies = list(sorted_path.iterdir())
    liquid_data = {energy.name: FitsLoader(energy) for energy in tqdm(energies)}
    return liquid_data



Overwriting toolkit.py


# Your old code

In [None]:
# The standard fare:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
%matplotlib inline

# Recall our use of this module to work with FITS files in Lab 4:
from astropy.io import fits 

# This lets us use various Unix (or Unix-like) commands within Python:
import os

In [None]:
cd

In [None]:
cd Washington State University (email.wsu.edu)\Carbon Lab Research Group - Documents\Synchrotron Logistics and Data\ALS - Berkeley\Data\BL1101\2023May\Liquid\10 May\Sorted Repeat\288.4

In [None]:
all_files = os.listdir() # Makes a list of all the .FITS files in the current directory
number_of_files = len(all_files) # Counts the number of files in 'all_files'

# Sets the total dark and total image as .FITS files with all the same attributes as the
# first dark or image in the directory.
dark_total = fits.open(all_files[0])
image_total = fits.open(all_files[1]) # first dark or image in the directory.

# change data type
dark_total[2].data = np.float64(dark_total[2].data)
image_total[2].data = np.float64(image_total[2].data)

# The files alternate between image and dark. This will loops through all the files in the directory summing the data accociated
# with the images and darks of each file.
for i in range(2, number_of_files-1, 2):
    image = fits.open(all_files[i]) # Next file to be added to the sum
    dark = fits.open(all_files[i+1])
    
    image[2].data = np.float64(image[2].data) # change data type
    dark[2].data = np.float64(dark[2].data)
    
    dark_total[2].data += dark[2].data # Add next set of data to total image
    image_total[2].data += image[2].data

    
# Rescale so it can be changed back to int16
# change data type
dark_total[2].data = dark_total[2].data*(65000/400000)
image_total[2].data = image_total[2].data*(65000/400000)
dark_total[2].data = np.uint16(dark_total[2].data)
image_total[2].data = np.uint16(image_total[2].data)

# Following two lines writes out a new .FITS file with the summed image and dark data. The headers of these files are the same
# as the headers of the first image and dark in the current directory. :: fits.writeto('out.fits', darksub) # save output

image_total.writeto('Pluronic_PTX_Repeat_sum_80842-00036.fits')
dark_total.writeto('Pluronic_PTX_Repeat_sum_80842-00037.fits')

image_total.info()
print(image_total[2].header)

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(20,20)) # Side-by-side plots of the summed image and dark
ax1.imshow(image_total[2].data)
ax2.imshow(dark_total[2].data)

In [None]:
plt.figure(figsize=(15,7.5))
plt.imshow(image_total[2].data)
plt.colorbar()

In [None]:
plt.figure(figsize=(15,7.5))
plt.imshow(dark_total[2].data)
plt.colorbar()